This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7e936e7 Initial client handler correctly sets stream id on responses
7e936e7 is described below
commit 7e936e7f2c6ccc73d8e3acd31d7050889ec1efbe
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Wed Jan 13 10:50:17 2021 +0000
Initial client handler correctly sets stream id on responses
Patch by Sam Tunnicliffe; reviewed by Mick Semb Wever for CASSANDRA-16376
---
CHANGES.txt | 1 +
.../transport/InitialConnectionHandler.java | 1 +
.../apache/cassandra/transport/SimpleClient.java | 4 +-
.../apache/cassandra/transport/BurnTestUtil.java | 2 +-
.../transport/ProtocolNegotiationTest.java | 101 ++++++++++++++++++---
5 files changed, 96 insertions(+), 13 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index fbb1bd1..47b47d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta5
+ * Ensure pre-negotiation native protocol responses have correct stream id
(CASSANDRA-16376)
* Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
* SSLFactory should initialize SSLContext before setting protocols
(CASSANDRA-16362)
* Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, in
the cassandra-all pom (CASSANDRA-16303)
diff --git
a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
index 70237c5..77e9232 100644
--- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
+++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
@@ -89,6 +89,7 @@ public class InitialConnectionHandler extends
ByteToMessageDecoder
supportedOptions.put(StartupMessage.COMPRESSION,
compressions);
supportedOptions.put(StartupMessage.PROTOCOL_VERSIONS,
ProtocolVersion.supportedVersions());
SupportedMessage supported = new
SupportedMessage(supportedOptions);
+ supported.setStreamId(inbound.header.streamId);
outbound = supported.encode(inbound.header.version);
ctx.writeAndFlush(outbound);
break;
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 43807a8..5ad4c17 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -25,6 +25,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -200,7 +201,8 @@ public class SimpleClient implements Closeable
responseHandler.eventHandler = eventHandler;
}
- protected void establishConnection() throws IOException
+ @VisibleForTesting
+ void establishConnection() throws IOException
{
// Configure the client.
bootstrap = new Bootstrap()
diff --git a/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
b/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
index e7798e1..e7bf6b8 100644
--- a/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
+++ b/test/burn/org/apache/cassandra/transport/BurnTestUtil.java
@@ -62,7 +62,7 @@ public class BurnTestUtil
for (int i = 0; i < sizeCaps.columnCountCap; i++)
values[i] = bytes(rnd, sizeCaps.valueMinSize,
sizeCaps.valueMaxSize);
- return new SimpleStatement(Integer.toString(idx), values);
+ return new SimpleStatement(Integer.toString(idx), (Object[]) values);
}
public static QueryMessage generateQueryMessage(int idx, SizeCaps sizeCaps)
diff --git
a/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java
b/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java
index 54a68bc..f33d8e6 100644
--- a/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java
+++ b/test/unit/org/apache/cassandra/transport/ProtocolNegotiationTest.java
@@ -18,17 +18,36 @@
package org.apache.cassandra.transport;
+import java.io.IOException;
+import java.util.Random;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.Session;
import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.transport.messages.OptionsMessage;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.transport.messages.StartupMessage;
+import static com.datastax.driver.core.ProtocolVersion.NEWEST_BETA;
+import static com.datastax.driver.core.ProtocolVersion.NEWEST_SUPPORTED;
+import static com.datastax.driver.core.ProtocolVersion.V1;
+import static com.datastax.driver.core.ProtocolVersion.V2;
+import static com.datastax.driver.core.ProtocolVersion.V3;
+import static com.datastax.driver.core.ProtocolVersion.V4;
+import static com.datastax.driver.core.ProtocolVersion.V5;
+import static
org.apache.cassandra.transport.messages.StartupMessage.CQL_VERSION;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
public class ProtocolNegotiationTest extends CQLTester
{
// to avoid JMX naming clashes between cluster metrics
@@ -40,33 +59,93 @@ public class ProtocolNegotiationTest extends CQLTester
requireNetwork();
}
+ @Before
+ public void initNetwork()
+ {
+ reinitializeNetwork();
+ }
+
@Test
public void serverSupportsV3AndV4ByDefault()
{
- reinitializeNetwork();
// client can explicitly request either V3 or V4
- testConnection(ProtocolVersion.V3, ProtocolVersion.V3);
- testConnection(ProtocolVersion.V4, ProtocolVersion.V4);
+ testConnection(V3, V3);
+ testConnection(V4, V4);
// if not specified, V4 is the default
- testConnection(null, ProtocolVersion.V4);
- testConnection(ProtocolVersion.NEWEST_SUPPORTED, ProtocolVersion.V4);
+ testConnection(null, V4);
+ testConnection(NEWEST_SUPPORTED, V4);
}
@Test
public void supportV5ConnectionWithBetaOption()
{
- reinitializeNetwork();
- testConnection(ProtocolVersion.V5, ProtocolVersion.V5);
- testConnection(ProtocolVersion.NEWEST_BETA, ProtocolVersion.V5);
+ testConnection(V5, V5);
+ testConnection(NEWEST_BETA, V5);
}
@Test
public void olderVersionsAreUnsupported()
{
+ testConnection(V1, V4);
+ testConnection(V2, V4);
+ }
+
+ @Test
+ public void preNegotiationResponsesHaveCorrectStreamId()
+ {
+
ProtocolVersion.SUPPORTED.forEach(this::testStreamIdsAcrossNegotiation);
+ }
+
+ private void testStreamIdsAcrossNegotiation(ProtocolVersion version)
+ {
+ long seed = System.currentTimeMillis();
+ Random random = new Random(seed);
reinitializeNetwork();
- testConnection(ProtocolVersion.V1, ProtocolVersion.V4);
- testConnection(ProtocolVersion.V2, ProtocolVersion.V4);
+ SimpleClient.Builder builder =
SimpleClient.builder(nativeAddr.getHostAddress(), nativePort);
+ if (version.isBeta())
+ builder.useBeta();
+ else
+ builder.protocolVersion(version);
+
+ try (SimpleClient client = builder.build())
+ {
+ client.establishConnection();
+ // Before STARTUP the client hasn't yet negotiated a protocol
version.
+ // All OPTIONS messages are received by the intial connection
handler.
+ OptionsMessage options = new OptionsMessage();
+ for (int i = 0; i < 100; i++)
+ {
+ int streamId = random.nextInt(254) + 1;
+ options.setStreamId(streamId);
+ Message.Response response = client.execute(options);
+ assertEquals(String.format("StreamId mismatch; version: %s,
seed: %s, iter: %s, expected: %s, actual: %s",
+ version, seed, i, streamId,
response.getStreamId()),
+ streamId, response.getStreamId());
+ }
+
+ int streamId = random.nextInt(254) + 1;
+ // STARTUP messages are handled by the initial connection handler
+ StartupMessage startup = new
StartupMessage(ImmutableMap.of(CQL_VERSION,
QueryProcessor.CQL_VERSION.toString()));
+ startup.setStreamId(streamId);
+ Message.Response response = client.execute(startup);
+ assertEquals(String.format("StreamId mismatch after negotiation;
version: %s, expected: %s, actual %s",
+ version, streamId,
response.getStreamId()),
+ streamId, response.getStreamId());
+
+ // Following STARTUP, the version specific handlers are fully
responsible for processing messages
+ QueryMessage query = new QueryMessage("SELECT * FROM
system.local", QueryOptions.DEFAULT);
+ query.setStreamId(streamId);
+ response = client.execute(query);
+ assertEquals(String.format("StreamId mismatch after negotiation;
version: %s, expected: %s, actual %s",
+ version, streamId,
response.getStreamId()),
+ streamId, response.getStreamId());
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ fail("Error establishing connection");
+ }
}
private void testConnection(com.datastax.driver.core.ProtocolVersion
requestedVersion,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]