This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 678ca3f Update port when reconnecting to pre-4.0 SSL storage
678ca3f is described below
commit 678ca3fc29c38b64a110dcf40693aa7840b0585c
Author: Jon Meredith <[email protected]>
AuthorDate: Tue Apr 7 18:58:59 2020 -0600
Update port when reconnecting to pre-4.0 SSL storage
On a failed outbound connection to a node with pending data, recheck
the messaging version before reattempting the connection.
Prior to this change, if the endpoint version was incorrectly set
to 4.0 when the node was running 3.0 with an SSL storage port
the connection would continuously try to reconnect on the wrong port.
The patch also improves some of the log messages to include the
actual port being connected to as well as the canonical endpoint for
the node.
Patch by Jon Meredith & Andy Tolbert; reviewed by Aleksey Yeschenko for
CASSANDRA-15727
Co-authored-by: Jon Meredith <[email protected]>
Co-authored-by: Andy Tolbert <[email protected]>
---
CHANGES.txt | 1 +
.../cassandra/net/InboundConnectionInitiator.java | 4 +-
.../org/apache/cassandra/net/InboundSockets.java | 19 ++++-
.../apache/cassandra/net/OutboundConnection.java | 20 ++++-
.../cassandra/net/OutboundConnectionInitiator.java | 10 +--
.../cassandra/net/OutboundConnectionSettings.java | 7 ++
.../org/apache/cassandra/net/ConnectionTest.java | 92 ++++++++++++++++++++++
7 files changed, 142 insertions(+), 11 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index e6acf40..98b3f68 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha5
+ * Update port when reconnecting to pre-4.0 SSL storage (CASSANDRA-15727)
* Only calculate dynamicBadnessThreshold once per loop in
DynamicEndpointSnitch (CASSANDRA-15798)
* Cleanup redundant nodetool commands added in 4.0 (CASSANDRA-15256)
* Update to Python driver 3.23 for cqlsh (CASSANDRA-15793)
diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
index c390ba4..3c1498b 100644
--- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
@@ -239,8 +239,8 @@ public class InboundConnectionInitiator
if (sslHandler != null)
{
SSLSession session = sslHandler.engine().getSession();
- logger.info("connection from peer {}, protocol = {}, cipher
suite = {}",
- ctx.channel().remoteAddress(),
session.getProtocol(), session.getCipherSuite());
+ logger.info("connection from peer {} to {}, protocol = {},
cipher suite = {}",
+ ctx.channel().remoteAddress(),
ctx.channel().localAddress(), session.getProtocol(), session.getCipherSuite());
}
}
diff --git a/src/java/org/apache/cassandra/net/InboundSockets.java
b/src/java/org/apache/cassandra/net/InboundSockets.java
index 8f74eaa..eb9ef8e 100644
--- a/src/java/org/apache/cassandra/net/InboundSockets.java
+++ b/src/java/org/apache/cassandra/net/InboundSockets.java
@@ -187,10 +187,23 @@ class InboundSockets
private static void addBindings(InboundConnectionSettings template,
ImmutableList.Builder<InboundSocket> out)
{
- InboundConnectionSettings settings = template.withDefaults();
- out.add(new InboundSocket(settings));
+ InboundConnectionSettings settings = template.withDefaults();
+ InboundConnectionSettings legacySettings =
template.withLegacyDefaults();
+
if (settings.encryption.enable_legacy_ssl_storage_port &&
settings.encryption.enabled)
- out.add(new InboundSocket(template.withLegacyDefaults()));
+ {
+ out.add(new InboundSocket(legacySettings));
+
+ /*
+ * If the legacy ssl storage port and storage port match, only
bind to the
+ * legacy ssl port. This makes it possible to configure a 4.0 node
like a 3.0
+ * node with only the ssl_storage_port if required.
+ */
+ if (settings.bindAddress.equals(legacySettings.bindAddress))
+ return;
+ }
+
+ out.add(new InboundSocket(settings));
}
public Future<Void> open(Consumer<ChannelPipeline> pipelineInjector)
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java
b/src/java/org/apache/cassandra/net/OutboundConnection.java
index b84ebc3..315d086 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -1084,7 +1084,25 @@ public class OutboundConnection
if (hasPending())
{
Promise<Result<MessagingSuccess>> result = new
AsyncPromise<>(eventLoop);
- state = new Connecting(state.disconnected(), result,
eventLoop.schedule(() -> attempt(result), max(100, retryRateMillis),
MILLISECONDS));
+ state = new Connecting(state.disconnected(),
+ result,
+ eventLoop.schedule(() ->
+ {
+ // Re-evaluate messagingVersion
before re-attempting the connection in case
+ // endpointToVersion were
updated. This happens if the outbound connection
+ // is made before the
endpointToVersion table is initially constructed or out
+ // of date (e.g. if outbound
connections are established for gossip
+ // as a result of an inbound
connection) and can result in the wrong outbound
+ // port being selected if
configured with enable_legacy_ssl_storage_port=true.
+ int maybeUpdatedVersion =
template.endpointToVersion().get(template.to);
+ if (maybeUpdatedVersion !=
messagingVersion)
+ {
+ logger.trace("Endpoint
version changed from {} to {} since connection initialized, updating.",
+
messagingVersion, maybeUpdatedVersion);
+ messagingVersion =
maybeUpdatedVersion;
+ }
+ attempt(result);
+ }, max(100, retryRateMillis),
MILLISECONDS));
retryRateMillis = min(1000, retryRateMillis * 2);
}
else
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
index fdfb2df..5f3eced 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
@@ -131,7 +131,7 @@ public class OutboundConnectionInitiator<SuccessType
extends OutboundConnectionI
{
// interrupt other connections, so they must attempt to
re-authenticate
MessagingService.instance().interruptOutbound(settings.to);
- return new FailedFuture<>(eventLoop, new
IOException("authentication failed to " + settings.to));
+ return new FailedFuture<>(eventLoop, new
IOException("authentication failed to " + settings.connectToId()));
}
// this is a bit ugly, but is the easiest way to ensure that if we
timeout we can propagate a suitable error message
@@ -146,7 +146,7 @@ public class OutboundConnectionInitiator<SuccessType
extends OutboundConnectionI
if (future.isCancelled() &&
!timedout.get())
resultPromise.cancel(true);
else if (future.isCancelled())
- resultPromise.tryFailure(new
IOException("Timeout handshaking with " + settings.connectTo));
+ resultPromise.tryFailure(new
IOException("Timeout handshaking with " + settings.connectToId()));
else
resultPromise.tryFailure(future.cause());
}
@@ -229,7 +229,7 @@ public class OutboundConnectionInitiator<SuccessType
extends OutboundConnectionI
public void channelActive(final ChannelHandlerContext ctx)
{
Initiate msg = new Initiate(requestMessagingVersion,
settings.acceptVersions, type, settings.framing, settings.from);
- logger.trace("starting handshake with peer {}, msg = {}",
settings.connectTo, msg);
+ logger.trace("starting handshake with peer {}, msg = {}",
settings.connectToId(), msg);
AsyncChannelPromise.writeAndFlush(ctx, msg.encode(),
future -> { if (!future.isSuccess()) exceptionCaught(ctx,
future.cause()); });
@@ -368,9 +368,9 @@ public class OutboundConnectionInitiator<SuccessType
extends OutboundConnectionI
JVMStabilityInspector.inspectThrowable(cause, false);
resultPromise.tryFailure(cause);
if (isCausedByConnectionReset(cause))
- logger.info("Failed to connect to peer {}", settings.to,
cause);
+ logger.info("Failed to connect to peer {}",
settings.connectToId(), cause);
else
- logger.error("Failed to handshake with peer {}",
settings.to, cause);
+ logger.error("Failed to handshake with peer {}",
settings.connectToId(), cause);
ctx.close();
}
catch (Throwable t)
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
index c78df61..5f83b6a 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
@@ -449,6 +449,13 @@ public class OutboundConnectionSettings
return connectTo;
}
+ public String connectToId()
+ {
+ return !to.equals(connectTo())
+ ? to.toString()
+ : to.toString() + '(' + connectTo().toString() + ')';
+ }
+
public Framing framing(ConnectionCategory category)
{
if (framing != null)
diff --git a/test/unit/org/apache/cassandra/net/ConnectionTest.java
b/test/unit/org/apache/cassandra/net/ConnectionTest.java
index e92f196..c8dc369 100644
--- a/test/unit/org/apache/cassandra/net/ConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/ConnectionTest.java
@@ -55,6 +55,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.commitlog.CommitLog;
@@ -70,6 +71,7 @@ import org.apache.cassandra.utils.FBUtilities;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.net.MessagingService.VERSION_30;
+import static org.apache.cassandra.net.MessagingService.VERSION_3014;
import static org.apache.cassandra.net.MessagingService.VERSION_40;
import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.net.MessagingService.current_version;
@@ -568,6 +570,96 @@ public class ConnectionTest
}
@Test
+ public void
testPendingOutboundConnectionUpdatesMessageVersionOnReconnectAttempt() throws
Throwable
+ {
+ final String storagePortProperty = Config.PROPERTY_PREFIX +
"ssl_storage_port";
+ final String originalStoragePort =
System.getProperty(storagePortProperty);
+ try
+ {
+ // Set up an inbound connection listening *only* on the SSL
storage port to
+ // replicate a 3.x node. Force the messaging version to be
incorrectly set to 4.0
+ // before the outbound connection attempt.
+ final Settings settings = Settings.LARGE;
+ final InetAddressAndPort endpoint =
FBUtilities.getBroadcastAddressAndPort();
+
+
MessagingService.instance().versions.set(FBUtilities.getBroadcastAddressAndPort(),
+
MessagingService.VERSION_40);
+
+ System.setProperty(storagePortProperty, "7011");
+ final InetAddressAndPort legacySSLAddrsAndPort =
endpoint.withPort(DatabaseDescriptor.getSSLStoragePort());
+ InboundConnectionSettings inboundSettings =
settings.inbound.apply(new
InboundConnectionSettings().withEncryption(encryptionOptions))
+
.withBindAddress(legacySSLAddrsAndPort)
+
.withAcceptMessaging(new AcceptVersions(VERSION_30, VERSION_3014))
+
.withSocketFactory(factory);
+ InboundSockets inbound = new
InboundSockets(Collections.singletonList(inboundSettings));
+ OutboundConnectionSettings outboundTemplate =
settings.outbound.apply(new
OutboundConnectionSettings(endpoint).withEncryption(encryptionOptions))
+
.withDefaultReserveLimits()
+
.withSocketFactory(factory)
+
.withDefaults(ConnectionCategory.MESSAGING);
+ ResourceLimits.EndpointAndGlobal reserveCapacityInBytes = new
ResourceLimits.EndpointAndGlobal(new
ResourceLimits.Concurrent(outboundTemplate.applicationSendQueueReserveEndpointCapacityInBytes),
outboundTemplate.applicationSendQueueReserveGlobalCapacityInBytes);
+ OutboundConnection outbound = new
OutboundConnection(settings.type, outboundTemplate, reserveCapacityInBytes);
+ try
+ {
+ logger.info("Running {} {} -> {}",
outbound.messagingVersion(), outbound.settings(), inboundSettings);
+ inbound.open().sync();
+
+ CountDownLatch done = new CountDownLatch(1);
+ unsafeSetHandler(Verb._TEST_1,
+ () -> (msg) -> done.countDown());
+
+ // Enqueuing outbound message will initiate an outbound
+ // connection with pending data in the pipeline
+ Message<?> message = Message.out(Verb._TEST_1, noPayload);
+ outbound.enqueue(message);
+
+ // Wait until the first connection attempt has taken place
+ // before updating the endpoint messaging version so that the
+ // connection takes place to a 4.0 node.
+ int attempts = 0;
+ final long waitForAttemptMillis =
TimeUnit.SECONDS.toMillis(15);
+ while (outbound.connectionAttempts() == 0 && attempts <
waitForAttemptMillis / 10)
+ {
+ Uninterruptibles.sleepUninterruptibly(10,
TimeUnit.MILLISECONDS);
+ attempts++;
+ }
+
+ // Now that the connection is being attempted, set the
endpoint version so
+ // that on the reconnect attempt the messaging version is
rechecked and the
+ // legacy ssl logic picks the storage port instead. This
should trigger a
+ // TRACE level log message "Endpoint version changed from 12
to 10 since
+ // connection initialized, updating."
+ outbound.settings().endpointToVersion.set(endpoint,
VERSION_30);
+
+ // The connection should have successfully connected and
delivered the _TEST_1
+ // message within the timout.
+ Assert.assertTrue(done.await(15, SECONDS));
+ Assert.assertTrue(outbound.isConnected());
+ Assert.assertTrue(String.format("expect less successful
connections (%d) than attempts (%d)",
+
outbound.successfulConnections(), outbound.connectionAttempts()),
+ outbound.successfulConnections() <
outbound.connectionAttempts());
+
+ }
+ finally
+ {
+ outbound.close(false);
+ inbound.close().get(30L, SECONDS);
+ outbound.close(false).get(30L, SECONDS);
+ resetVerbs();
+ MessagingService.instance().messageHandlers.clear();
+ }
+ }
+ finally
+ {
+
MessagingService.instance().versions.set(FBUtilities.getBroadcastAddressAndPort(),
+ current_version);
+ if (originalStoragePort != null)
+ System.setProperty(storagePortProperty, originalStoragePort);
+ else
+ System.clearProperty(storagePortProperty);
+ }
+ }
+
+ @Test
public void testCloseIfEndpointDown() throws Throwable
{
testManual((settings, inbound, outbound, endpoint) -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]