This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch jdbc_over_thin_sql
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/jdbc_over_thin_sql by this
push:
new 15c8d0a01d9 IGNITE-26146 Sql. Jdbc. Abort the connection to a node
that does not support a feature required by the new JDBC driver (#6566)
15c8d0a01d9 is described below
commit 15c8d0a01d9eff5418619d387aef4492ec34c99d
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Tue Nov 4 10:00:22 2025 +0300
IGNITE-26146 Sql. Jdbc. Abort the connection to a node that does not
support a feature required by the new JDBC driver (#6566)
---
.../ignite/internal/client/ChannelValidator.java | 50 ++++
.../ignite/internal/client/ProtocolContext.java | 18 +-
.../ignite/internal/client/ReliableChannel.java | 17 +-
.../ignite/internal/client/TcpClientChannel.java | 16 +-
.../ignite/internal/client/TcpIgniteClient.java | 23 +-
modules/compatibility-tests/build.gradle | 1 +
...cOverThinSqlWithOldServerCompatibilityTest.java | 82 ++++++
.../org/apache/ignite/internal/IgniteCluster.java | 7 +-
.../ignite/internal/jdbc/JdbcConnection.java | 2 +-
.../org/apache/ignite/jdbc/IgniteJdbcDriver.java | 30 ++-
.../client/ItThinClientChannelValidatorTest.java | 274 +++++++++++++++++++++
11 files changed, 500 insertions(+), 20 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ChannelValidator.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ChannelValidator.java
new file mode 100644
index 00000000000..c3a9458b471
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ChannelValidator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import org.apache.ignite.client.IgniteClientConnectionException;
+import org.apache.ignite.client.RetryPolicy;
+
+/**
+ * Validator of the connection channel.
+ *
+ * <p>Performs protocol context validation when a connection is established.
+ */
+@FunctionalInterface
+public interface ChannelValidator {
+ /**
+ * Validates connection channel when a connection to a node is established,
+ *
+ * <p>Any exception thrown from this method indicates that validation
failed
+ * and the connection channel to this node will be closed. However, if,
after
+ * an unsuccessful validation, it is necessary to retry connections
according
+ * to the configured {@link RetryPolicy}, then the exception must be an
+ * instance of {@link IgniteClientConnectionException}.
+ *
+ * <p>If {@link RetryPolicy} is not specified, then after the first
unsuccessful validation
+ * the client will be closed with an exception received from the validator.
+ *
+ * <p>If {@link RetryPolicy} is specified, the client will attempt to
reconnect to cluster
+ * nodes according to the configured policy. If the connection is not
established within
+ * the policy-defined number of attempts due to channel validation, the
client will be closed
+ * with an exception received from the validator.
+ *
+ * @param ctx Protocol context.
+ */
+ void validate(ProtocolContext ctx);
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
index 5c66d455f06..a2fae4cb15e 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import
org.apache.ignite.client.IgniteClientFeatureNotSupportedByServerException;
import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
+import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.network.ClusterNode;
/**
@@ -49,6 +50,9 @@ public class ProtocolContext {
/** Cluster name. */
private final String clusterName;
+ /** Cluster node product version. */
+ private final IgniteProductVersion nodeProductVersion;
+
/**
* Constructor.
*
@@ -58,6 +62,7 @@ public class ProtocolContext {
* @param clusterNode Cluster node.
* @param clusterIds Cluster ids.
* @param clusterName Cluster name.
+ * @param nodeProductVersion Cluster node product version.
*/
ProtocolContext(
ProtocolVersion ver,
@@ -65,7 +70,8 @@ public class ProtocolContext {
long serverIdleTimeout,
ClusterNode clusterNode,
List<UUID> clusterIds,
- String clusterName
+ String clusterName,
+ IgniteProductVersion nodeProductVersion
) {
this.ver = ver;
this.features = Collections.unmodifiableSet(features != null ?
features : EnumSet.noneOf(ProtocolBitmaskFeature.class));
@@ -73,6 +79,7 @@ public class ProtocolContext {
this.clusterNode = clusterNode;
this.clusterIds = clusterIds;
this.clusterName = clusterName;
+ this.nodeProductVersion = nodeProductVersion;
}
/**
@@ -149,6 +156,15 @@ public class ProtocolContext {
return clusterNode;
}
+ /**
+ * Returns cluster node product version.
+ *
+ * @return cluster node product version.
+ */
+ public IgniteProductVersion productVersion() {
+ return nodeProductVersion;
+ }
+
/**
* Returns cluster ids, from older to newer. The last id is the current
cluster id.
*
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index cbe8b7a2160..ff29f8dbc0c 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -133,6 +133,12 @@ public final class ReliableChannel implements
AutoCloseable {
/** Inflights. */
private final ClientTransactionInflights inflights;
+ /**
+ * A validator that is called when a connection to a node is established,
+ * if it throws an exception, the network channel to that node will be
closed.
+ */
+ private final @Nullable ChannelValidator channelValidator;
+
/**
* Constructor.
*
@@ -140,17 +146,22 @@ public final class ReliableChannel implements
AutoCloseable {
* @param clientCfg Client config.
* @param metrics Client metrics.
* @param observableTimeTracker Tracker of the latest time observed by
client.
+ * @param channelValidator A validator that is called when a connection to
a node is established,
+ * if it throws an exception, the network channel
to that node will be closed.
*/
ReliableChannel(
ClientChannelFactory chFactory,
IgniteClientConfiguration clientCfg,
ClientMetricSource metrics,
- HybridTimestampTracker observableTimeTracker) {
+ HybridTimestampTracker observableTimeTracker,
+ @Nullable ChannelValidator channelValidator
+ ) {
this.clientCfg = Objects.requireNonNull(clientCfg, "clientCfg");
this.chFactory = Objects.requireNonNull(chFactory, "chFactory");
this.log = ClientUtils.logger(clientCfg, ReliableChannel.class);
this.metrics = metrics;
this.observableTimeTracker =
Objects.requireNonNull(observableTimeTracker, "observableTime");
+ this.channelValidator = channelValidator;
connMgr = new NettyClientConnectionMultiplexer(metrics);
connMgr.start(clientCfg);
@@ -885,6 +896,10 @@ public final class ReliableChannel implements
AutoCloseable {
inflights);
chFut0 = createFut.thenApply(ch -> {
+ if (channelValidator != null) {
+ channelValidator.validate(ch.protocolContext());
+ }
+
UUID currentClusterId = ch.protocolContext().clusterId();
UUID oldClusterId = clusterId.compareAndExchange(null,
currentClusterId);
List<UUID> validClusterIds =
ch.protocolContext().clusterIds();
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 89687d218d0..a67c63284ce 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -66,6 +66,7 @@ import
org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ResponseFlags;
import org.apache.ignite.internal.future.timeout.TimeoutObject;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.thread.PublicApiThreading;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.ViewUtils;
@@ -733,11 +734,13 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
long observableTimestamp = unpacker.unpackLong();
observableTimestampListener.accept(observableTimestamp);
- unpacker.unpackByte(); // cluster version major
- unpacker.unpackByte(); // cluster version minor
- unpacker.unpackByte(); // cluster version maintenance
- unpacker.unpackByteNullable(); // cluster version patch
- unpacker.unpackStringNullable(); // cluster version pre release
+ byte major = unpacker.unpackByte(); // cluster version major
+ byte minor = unpacker.unpackByte(); // cluster version minor
+ byte maintenance = unpacker.unpackByte(); // cluster version
maintenance
+ Byte patch = unpacker.unpackByteNullable(); // cluster version
patch
+ String preRelease = unpacker.unpackStringNullable(); // cluster
version pre release
+
+ IgniteProductVersion nodeProductVersion = new
IgniteProductVersion(major, minor, maintenance, patch, preRelease);
BitSet serverFeatures = HandshakeUtils.unpackFeatures(unpacker);
HandshakeUtils.unpackExtensions(unpacker);
@@ -745,7 +748,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
BitSet mutuallySupportedFeatures =
HandshakeUtils.supportedFeatures(SUPPORTED_FEATURES, serverFeatures);
EnumSet<ProtocolBitmaskFeature> features =
ProtocolBitmaskFeature.enumSet(mutuallySupportedFeatures);
- protocolCtx = new ProtocolContext(srvVer, features,
serverIdleTimeout, clusterNode, clusterIds, clusterName);
+ protocolCtx = new ProtocolContext(srvVer, features,
serverIdleTimeout, clusterNode, clusterIds, clusterName,
+ nodeProductVersion);
return null;
} catch (Throwable e) {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index a46dbe2ebce..76fdc0e7eb2 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -96,9 +96,12 @@ public class TcpIgniteClient implements IgniteClient {
*
* @param cfg Config.
* @param observableTimeTracker Tracker of the latest time observed by
client.
+ * @param channelValidator A validator that is called when a connection to
a node is established,
+ * if it throws an exception, the network channel
to that node will be closed.
*/
- private TcpIgniteClient(IgniteClientConfiguration cfg,
HybridTimestampTracker observableTimeTracker) {
- this(TcpClientChannel::createAsync, cfg, observableTimeTracker);
+ private TcpIgniteClient(IgniteClientConfiguration cfg,
HybridTimestampTracker observableTimeTracker,
+ @Nullable ChannelValidator channelValidator) {
+ this(TcpClientChannel::createAsync, cfg, observableTimeTracker,
channelValidator);
}
/**
@@ -107,15 +110,18 @@ public class TcpIgniteClient implements IgniteClient {
* @param chFactory Channel factory.
* @param cfg Config.
* @param observableTimeTracker Tracker of the latest time observed by
client.
+ * @param channelValidator A validator that is called when a connection to
a node is established,
+ * if it throws an exception, the network channel
to that node will be closed.
*/
- private TcpIgniteClient(ClientChannelFactory chFactory,
IgniteClientConfiguration cfg, HybridTimestampTracker observableTimeTracker) {
+ private TcpIgniteClient(ClientChannelFactory chFactory,
IgniteClientConfiguration cfg, HybridTimestampTracker observableTimeTracker,
+ @Nullable ChannelValidator channelValidator) {
assert chFactory != null;
assert cfg != null;
this.cfg = cfg;
metrics = new ClientMetricSource();
- ch = new ReliableChannel(chFactory, cfg, metrics,
observableTimeTracker);
+ ch = new ReliableChannel(chFactory, cfg, metrics,
observableTimeTracker, channelValidator);
tables = new ClientTables(ch, marshallers,
cfg.sqlPartitionAwarenessMetadataCacheSize());
transactions = new ClientTransactions(ch);
compute = new ClientCompute(ch, tables);
@@ -159,7 +165,7 @@ public class TcpIgniteClient implements IgniteClient {
* @return Future representing pending completion of the operation.
*/
public static CompletableFuture<IgniteClient>
startAsync(IgniteClientConfiguration cfg) {
- return startAsync(cfg, HybridTimestampTracker.atomicTracker(null));
+ return startAsync(cfg, HybridTimestampTracker.atomicTracker(null),
null);
}
/**
@@ -167,14 +173,17 @@ public class TcpIgniteClient implements IgniteClient {
*
* @param cfg Thin client configuration.
* @param observableTimeTracker Tracker of the latest time observed by
client.
+ * @param channelValidator A validator that is called when a connection to
a node is established,
+ * if it throws an exception, the network channel
to that node will be closed.
* @return Future representing pending completion of the operation.
*/
- public static CompletableFuture<IgniteClient>
startAsync(IgniteClientConfiguration cfg, HybridTimestampTracker
observableTimeTracker) {
+ public static CompletableFuture<IgniteClient>
startAsync(IgniteClientConfiguration cfg, HybridTimestampTracker
observableTimeTracker,
+ @Nullable ChannelValidator channelValidator) {
ErrorGroups.initialize();
try {
//noinspection resource: returned from method
- var client = new TcpIgniteClient(cfg, observableTimeTracker);
+ var client = new TcpIgniteClient(cfg, observableTimeTracker,
channelValidator);
return client.initAsync().thenApply(x -> client);
} catch (IgniteException e) {
diff --git a/modules/compatibility-tests/build.gradle
b/modules/compatibility-tests/build.gradle
index 88890796433..0dde4d408c4 100644
--- a/modules/compatibility-tests/build.gradle
+++ b/modules/compatibility-tests/build.gradle
@@ -37,6 +37,7 @@ dependencies {
integrationTestImplementation project(':ignite-api')
integrationTestImplementation project(':ignite-client')
integrationTestImplementation project(':ignite-runner')
+ integrationTestImplementation project(':ignite-jdbc')
testFixturesImplementation libs.gradle.tooling.api
testFixturesImplementation libs.awaitility
diff --git
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/JdbcOverThinSqlWithOldServerCompatibilityTest.java
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/JdbcOverThinSqlWithOldServerCompatibilityTest.java
new file mode 100644
index 00000000000..a0675e9c5aa
--- /dev/null
+++
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/JdbcOverThinSqlWithOldServerCompatibilityTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.IgniteClientConnectionException;
+import org.apache.ignite.internal.CompatibilityTestBase;
+import org.apache.ignite.lang.ErrorGroups.Client;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Verifies that the JDBC client aborts the connection to a server that does
not support the required feature.
+ */
+@ParameterizedClass
+@MethodSource("serverVersions")
+public class JdbcOverThinSqlWithOldServerCompatibilityTest extends
CompatibilityTestBase {
+ @Override
+ protected void setupBaseVersion(Ignite baseIgnite) {
+ // No-op.
+ }
+
+ @Override
+ protected int nodesCount() {
+ return 1;
+ }
+
+ @Override
+ protected boolean restartWithCurrentEmbeddedVersion() {
+ // Keep old servers running.
+ return false;
+ }
+
+ private static List<String> serverVersions() {
+ return List.of("3.0.0", "3.1.0");
+ }
+
+ @Test
+ void jdbcConnectionToTheOldServerIsRejected() {
+ Throwable ex = assertThrows(SQLException.class,
+ () ->
DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1:" +
cluster.clientPort()),
+ "Failed to connect to server"
+ );
+
+ Throwable cause = ex.getCause();
+
+ assertThat(cause, instanceOf(IgniteClientConnectionException.class));
+
+ IgniteClientConnectionException connectEx =
(IgniteClientConnectionException) cause;
+
+ assertThat(connectEx.getMessage(),
+ containsString("Connection to node aborted, because the node
does not support the feature required "
+ + "by the driver being used. Please refer to the
documentation and use a compatible version "
+ + "of the JDBC driver to connect to this node"));
+ assertThat(connectEx.code(), is(Client.CONNECTION_ERR));
+ }
+}
diff --git
a/modules/compatibility-tests/src/testFixtures/java/org/apache/ignite/internal/IgniteCluster.java
b/modules/compatibility-tests/src/testFixtures/java/org/apache/ignite/internal/IgniteCluster.java
index 51fe9ec52c9..1cab5eb6052 100644
---
a/modules/compatibility-tests/src/testFixtures/java/org/apache/ignite/internal/IgniteCluster.java
+++
b/modules/compatibility-tests/src/testFixtures/java/org/apache/ignite/internal/IgniteCluster.java
@@ -246,7 +246,7 @@ public class IgniteCluster {
* @return Ignite client instance.
*/
public IgniteClient createClient() {
- return IgniteClient.builder().addresses("localhost:" +
clusterConfiguration.baseClientPort()).build();
+ return IgniteClient.builder().addresses("localhost:" +
clientPort()).build();
}
/**
@@ -269,6 +269,11 @@ public class IgniteCluster {
return
clusterConfiguration.nodeNamingStrategy().nodeName(clusterConfiguration,
nodeIndex);
}
+ /** Returns base client port number from cluster configuration. */
+ public int clientPort() {
+ return clusterConfiguration.baseClientPort();
+ }
+
private ServerRegistration startEmbeddedNode(int nodeIndex) {
String nodeName = nodeName(nodeIndex);
diff --git
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
index f73ec73c812..9dd184633fe 100644
---
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
+++
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcConnection.java
@@ -190,7 +190,7 @@ public class JdbcConnection implements Connection {
IgniteClientConfiguration.DFLT_SQL_PARTITION_AWARENESS_METADATA_CACHE_SIZE
);
- return (TcpIgniteClient) sync(TcpIgniteClient.startAsync(cfg,
observableTimeTracker));
+ return (TcpIgniteClient) sync(TcpIgniteClient.startAsync(cfg,
observableTimeTracker, null));
}
/**
diff --git
a/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java
b/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java
index 36e80491f41..60368faddbb 100644
--- a/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java
+++ b/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java
@@ -20,6 +20,7 @@ package org.apache.ignite.jdbc;
import static
org.apache.ignite.internal.jdbc.ConnectionPropertiesImpl.URL_PREFIX;
import static
org.apache.ignite.internal.jdbc.proto.SqlStateCode.CLIENT_CONNECTION_FAILED;
import static org.apache.ignite.internal.util.ViewUtils.sync;
+import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
import com.google.auto.service.AutoService;
import java.sql.Connection;
@@ -34,10 +35,14 @@ import java.util.logging.Logger;
import org.apache.ignite.client.BasicAuthenticator;
import org.apache.ignite.client.IgniteClientAuthenticator;
import org.apache.ignite.client.IgniteClientConfiguration;
+import org.apache.ignite.client.IgniteClientConnectionException;
+import org.apache.ignite.client.RetryLimitPolicy;
import org.apache.ignite.client.SslConfiguration;
+import org.apache.ignite.internal.client.ChannelValidator;
import org.apache.ignite.internal.client.HostAndPort;
import org.apache.ignite.internal.client.IgniteClientConfigurationImpl;
import org.apache.ignite.internal.client.TcpIgniteClient;
+import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.jdbc.ConnectionProperties;
@@ -45,6 +50,8 @@ import
org.apache.ignite.internal.jdbc.ConnectionPropertiesImpl;
import org.apache.ignite.internal.jdbc.JdbcClientQueryEventHandler;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler;
import org.apache.ignite.internal.jdbc2.JdbcConnection2;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
/**
@@ -275,7 +282,7 @@ public class IgniteJdbcDriver implements Driver {
return instance != null;
}
- private static TcpIgniteClient createIgniteClient(
+ private TcpIgniteClient createIgniteClient(
ConnectionProperties connectionProperties,
HybridTimestampTracker observableTimeTracker
) {
@@ -293,7 +300,7 @@ public class IgniteJdbcDriver implements Driver {
null,
IgniteClientConfigurationImpl.DFLT_HEARTBEAT_INTERVAL,
IgniteClientConfigurationImpl.DFLT_HEARTBEAT_TIMEOUT,
- null,
+ new RetryLimitPolicy(),
null,
extractSslConfiguration(connectionProperties),
false,
@@ -302,7 +309,24 @@ public class IgniteJdbcDriver implements Driver {
connectionProperties.getPartitionAwarenessMetadataCacheSize()
);
- return (TcpIgniteClient) sync(TcpIgniteClient.startAsync(cfg,
observableTimeTracker));
+ ChannelValidator channelValidator = ctx -> {
+ if
(!ctx.isFeatureSupported(ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT)) {
+ ClusterNode node = ctx.clusterNode();
+
+ throw new IgniteClientConnectionException(
+ CONNECTION_ERR,
+ IgniteStringFormatter.format("Connection to node
aborted, because the node does not support "
+ + "the feature required by the driver being
used. Please refer to the documentation and use a compatible "
+ + "version of the JDBC driver to connect to
this node "
+ + "[name={}, address={}, productVersion={},
driverVersion={}.{}]",
+ node.name(), node.address(),
ctx.productVersion(), getMajorVersion(), getMinorVersion()),
+ null
+ );
+ }
+ };
+
+ return (TcpIgniteClient) sync(TcpIgniteClient.startAsync(
+ cfg, observableTimeTracker, channelValidator));
}
private static @Nullable SslConfiguration
extractSslConfiguration(ConnectionProperties connProps) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientChannelValidatorTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientChannelValidatorTest.java
new file mode 100644
index 00000000000..0ec3f423661
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientChannelValidatorTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_BACKGROUND_RECONNECT_INTERVAL;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.IgniteClientConfiguration;
+import org.apache.ignite.client.IgniteClientConnectionException;
+import org.apache.ignite.client.RetryLimitPolicy;
+import org.apache.ignite.internal.client.ChannelValidator;
+import org.apache.ignite.internal.client.IgniteClientConfigurationImpl;
+import org.apache.ignite.internal.client.ProtocolContext;
+import org.apache.ignite.internal.client.TcpIgniteClient;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * End-to-end tests for the thin client network channel validator.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItThinClientChannelValidatorTest extends BaseIgniteAbstractTest {
+ private static final String ERROR_MESSAGE = "Connection to node aborted
[name={}, address={}, version={}]";
+
+ private static final int NODES_COUNT = 5;
+
+ private final List<Ignite> startedNodes = new ArrayList<>();
+
+ private List<IgniteServer> nodes;
+
+ @BeforeAll
+ void beforeAll(TestInfo testInfo, @WorkDirectory Path workDir) {
+ setupCluster(testInfo, workDir);
+ }
+
+ @AfterAll
+ void afterAll() throws Exception {
+ var closeables = new ArrayList<AutoCloseable>();
+
+ nodes.stream()
+ .map(node -> (AutoCloseable) node::shutdown)
+ .forEach(closeables::add);
+
+ IgniteUtils.closeAll(closeables);
+ }
+
+ /**
+ * The test verifies that the connection is established successfully if
there is
+ * at least one compatible node in the cluster to connect to.
+ */
+ @ParameterizedTest(name = "node = {0}")
+ @ValueSource(ints = {0, NODES_COUNT - 1})
+ void singleNodeCompatible(int compatibleNodeIdx) throws
InterruptedException {
+ CountDownLatch latch = new CountDownLatch(startedNodes.size());
+ String compatibleNode = startedNodes.get(compatibleNodeIdx).name();
+
+ ChannelValidator validator = ctx -> {
+ latch.countDown();
+
+ if (!compatibleNode.equals(ctx.clusterNode().name())) {
+ raiseConnectException(ctx);
+ }
+ };
+
+ long reconnectInterval = 1_000;
+
+ try (IgniteClient client = startClient(reconnectInterval, validator)) {
+ boolean validatorChecksMatch = latch.await(5, TimeUnit.SECONDS);
+
+ assertThat(validatorChecksMatch, is(true));
+
+ Awaitility.await().timeout(3, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertThat(client.connections().size(), is(1)));
+
+ client.sql().execute(null, "SELECT 1").close();
+
+ // Make sure there are no new connections.
+ Thread.sleep(reconnectInterval * 2);
+ assertThat(client.connections().size(), is(1));
+ }
+ }
+
+ /**
+ * Test checks the channel validation exception if there are no compatible
nodes.
+ */
+ @Test
+ void noCompatibleNodes() {
+ AtomicInteger attemptCounter = new AtomicInteger();
+
+ ChannelValidator validator = ctx -> {
+ attemptCounter.getAndIncrement();
+
+ raiseConnectException(ctx);
+ };
+
+ IgniteTestUtils.assertThrows(
+ IgniteClientConnectionException.class,
+ () -> startClient(DFLT_BACKGROUND_RECONNECT_INTERVAL,
validator),
+ "Connection to node aborted"
+ );
+
+ assertThat(attemptCounter.get(), is(RetryLimitPolicy.DFLT_RETRY_LIMIT
+ /* initial attempt */ 1));
+ }
+
+ /**
+ * Test verifies that the thin client successfully performs background
reconnection to nodes.
+ */
+ @ParameterizedTest(name = "node = {0}")
+ @ValueSource(ints = {0, NODES_COUNT - 1})
+ void backgroundReconnect(int compatibleNodeIdx) throws
InterruptedException {
+ CountDownLatch latch = new CountDownLatch(startedNodes.size());
+ String compatibleNode = startedNodes.get(compatibleNodeIdx).name();
+ AtomicBoolean allowAll = new AtomicBoolean(false);
+
+ ChannelValidator validator = ctx -> {
+ latch.countDown();
+
+ if (allowAll.get() ||
compatibleNode.equals(ctx.clusterNode().name())) {
+ return;
+ }
+
+ raiseConnectException(ctx);
+ };
+
+ long reconnectInterval = 1_000;
+
+ try (IgniteClient client = startClient(reconnectInterval, validator)) {
+ boolean validatorChecksMatch = latch.await(5, TimeUnit.SECONDS);
+ assertThat(validatorChecksMatch, is(true));
+
+ Awaitility.await().timeout(3, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertThat(client.connections().size(), is(1)));
+
+ assertThat(client.connections().size(), is(1));
+
+ allowAll.set(true);
+
+ // Connections to all nodes should be established.
+ Awaitility.await().timeout(10, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertThat(client.connections().size(), is(nodes.size())));
+ }
+ }
+
+ private IgniteClient startClient(long reconnectInterval, ChannelValidator
channelValidator) {
+ String[] addresses = startedNodes.stream()
+ .map(ignite -> unwrapIgniteImpl(ignite).clientAddress().port())
+ .map(port -> "127.0.0.1" + ":" + port)
+ .toArray(String[]::new);
+
+ var cfg = new IgniteClientConfigurationImpl(
+ null,
+ addresses,
+ 0,
+ reconnectInterval,
+ null,
+ IgniteClientConfigurationImpl.DFLT_HEARTBEAT_INTERVAL,
+ IgniteClientConfigurationImpl.DFLT_HEARTBEAT_TIMEOUT,
+ new RetryLimitPolicy(),
+ null,
+ null,
+ false,
+ null,
+ IgniteClientConfiguration.DFLT_OPERATION_TIMEOUT,
+
IgniteClientConfiguration.DFLT_SQL_PARTITION_AWARENESS_METADATA_CACHE_SIZE
+ );
+
+ return await(TcpIgniteClient.startAsync(
+ cfg,
+ HybridTimestampTracker.atomicTracker(null),
+ channelValidator
+ ));
+ }
+
+ private static void raiseConnectException(ProtocolContext ctx) {
+ ClusterNode node = ctx.clusterNode();
+
+ throw new IgniteClientConnectionException(
+ CONNECTION_ERR,
+ IgniteStringFormatter.format(ERROR_MESSAGE, node.name(),
node.address(), ctx.productVersion()),
+ null
+ );
+ }
+
+ private void setupCluster(TestInfo testInfo, Path workDir) {
+ Map<String, String> nodesBootstrapCfg = new LinkedHashMap<>();
+
+ for (int i = 0; i < NODES_COUNT; i++) {
+ String nodeName = testNodeName(testInfo, 3344 + i);
+
+ nodesBootstrapCfg.put(
+ nodeName,
+ "ignite {\n"
+ + " network.port: " + (3344 + i) + ",\n"
+ + " network.nodeFinder.netClusterNodes: [
\"localhost:3344\" ]\n"
+ + (i == 1 ? ("
clientConnector.sendServerExceptionStackTraceToClient: true\n"
+ + " clientConnector.metricsEnabled: true\n") : "")
+ + " clientConnector.port: " + (10800 + i) + ",\n"
+ + " rest.port: " + (10300 + i) + "\n"
+ + " compute.threadPoolSize: 1\n"
+ + "}"
+ );
+ }
+
+ nodes = nodesBootstrapCfg.entrySet().stream()
+ .map(e -> TestIgnitionManager.start(e.getKey(), e.getValue(),
workDir.resolve(e.getKey())))
+ .collect(toList());
+
+ IgniteServer metaStorageNode = nodes.get(0);
+
+ InitParameters initParameters = InitParameters.builder()
+ .metaStorageNodes(metaStorageNode)
+ .clusterName("cluster")
+ .build();
+ TestIgnitionManager.init(metaStorageNode, initParameters);
+
+ for (IgniteServer node : nodes) {
+ assertThat(node.waitForInitAsync(), willCompleteSuccessfully());
+
+ startedNodes.add(node.api());
+ }
+ }
+}