This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 3ec496400 IGNITE-16823 .NET: Add cluster awareness to Compute (#771) 3ec496400 is described below commit 3ec4964003feafbf8f066dc4acac1f095c55f86c Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Thu Apr 14 10:12:08 2022 +0300 IGNITE-16823 .NET: Add cluster awareness to Compute (#771) * Include node id and name into handshake response. * Establish connections to all known endpoints in background. * During Compute calls, match target node name against active connections and send request directly to the correct node when possible. --- .../handler/ClientInboundMessageHandler.java | 10 +- .../compute/ClientComputeExecuteRequest.java | 13 +- .../ignite/internal/client/ProtocolContext.java | 31 ++++- .../ignite/internal/client/TcpClientChannel.java | 27 ++-- .../internal/client/compute/ClientCompute.java | 4 - .../internal/client/io/ClientConnection.java | 8 ++ .../client/io/netty/NettyClientConnection.java | 7 + .../ignite/client/TestClientHandlerModule.java | 8 +- .../java/org/apache/ignite/client/TestServer.java | 8 +- .../platforms/dotnet/Apache.Ignite.Tests.ruleset | 3 + .../Compute/ComputeClusterAwarenessTests.cs | 108 +++++++++++++++ .../Apache.Ignite.Tests/Compute/ComputeTests.cs | 12 ++ .../dotnet/Apache.Ignite.Tests/FakeServer.cs | 47 ++++++- .../RawSocketConnectionTests.cs | 2 +- .../dotnet/Apache.Ignite.Tests/TestUtils.cs | 25 ++++ .../dotnet/Apache.Ignite/IIgniteClient.cs | 8 ++ .../platforms/dotnet/Apache.Ignite/IgniteClient.cs | 4 +- .../Apache.Ignite/Internal/ClientFailoverSocket.cs | 153 +++++++++++++++++++-- .../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 41 +++--- .../Apache.Ignite/Internal/Compute/Compute.cs | 43 ++++-- .../{Table/Schema.cs => ConnectionContext.cs} | 23 ++-- .../Apache.Ignite/Internal/IgniteClientInternal.cs | 5 + .../dotnet/Apache.Ignite/Internal/Table/Schema.cs | 7 +- 23 files changed, 500 insertions(+), 97 deletions(-) diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java index 72194073c..7656432c7 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.IgniteInternalCheckedException; import org.apache.ignite.lang.IgniteLogger; +import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.table.manager.IgniteTables; import org.apache.ignite.tx.IgniteTransactions; @@ -189,11 +190,16 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter { // Response. ProtocolVersion.LATEST_VER.pack(packer); - packer.packInt(ClientErrorCode.SUCCESS); + + packer.packLong(configuration.idleTimeout()); + + ClusterNode localMember = clusterService.topologyService().localMember(); + packer.packString(localMember.id()); + packer.packString(localMember.name()); + packer.packBinaryHeader(0); // Features. packer.packMapHeader(0); // Extensions. - packer.packLong(configuration.idleTimeout()); write(packer, ctx); } catch (Throwable t) { diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java index 78c4ff5f6..926e3821b 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java @@ -24,9 +24,8 @@ import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.internal.client.proto.ClientMessagePacker; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; -import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterService; -import org.apache.ignite.network.NetworkAddress; import org.jetbrains.annotations.NotNull; /** @@ -47,9 +46,15 @@ public class ClientComputeExecuteRequest { ClientMessagePacker out, IgniteCompute compute, ClusterService cluster) { - var node = in.tryUnpackNil() + var nodeName = in.tryUnpackNil() ? null : in.unpackString(); + + var node = nodeName == null ? cluster.topologyService().localMember() - : new ClusterNode(in.unpackString(), in.unpackString(), new NetworkAddress(in.unpackString(), in.unpackInt())); + : cluster.topologyService().getByConsistentId(nodeName); + + if (node == null) { + throw new IgniteException("Specified node is not present in the cluster: " + nodeName); + } String jobClassName = in.unpackString(); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java index 70a14188b..bacb0e1b4 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java @@ -17,9 +17,12 @@ package org.apache.ignite.internal.client; +import java.util.Collections; import java.util.EnumSet; +import java.util.Set; import org.apache.ignite.client.IgniteClientFeatureNotSupportedByServerException; import org.apache.ignite.internal.client.proto.ProtocolVersion; +import org.apache.ignite.network.ClusterNode; /** * Protocol Context. @@ -29,22 +32,31 @@ public class ProtocolContext { private final ProtocolVersion ver; /** Features. */ - private final EnumSet<ProtocolBitmaskFeature> features; + private final Set<ProtocolBitmaskFeature> features; /** Server idle timeout. */ private final long serverIdleTimeout; + /** Cluster node. */ + private final ClusterNode clusterNode; + /** * Constructor. * * @param ver Protocol version. * @param features Supported features. * @param serverIdleTimeout Server idle timeout. + * @param clusterNode Cluster node. */ - public ProtocolContext(ProtocolVersion ver, EnumSet<ProtocolBitmaskFeature> features, long serverIdleTimeout) { + public ProtocolContext( + ProtocolVersion ver, + EnumSet<ProtocolBitmaskFeature> features, + long serverIdleTimeout, + ClusterNode clusterNode) { this.ver = ver; - this.features = features != null ? features : EnumSet.noneOf(ProtocolBitmaskFeature.class); + this.features = Collections.unmodifiableSet(features != null ? features : EnumSet.noneOf(ProtocolBitmaskFeature.class)); this.serverIdleTimeout = serverIdleTimeout; + this.clusterNode = clusterNode; } /** @@ -74,7 +86,7 @@ public class ProtocolContext { * * @return Supported features. */ - public EnumSet<ProtocolBitmaskFeature> features() { + public Set<ProtocolBitmaskFeature> features() { return features; } @@ -92,7 +104,16 @@ public class ProtocolContext { * * @return Server idle timeout. */ - public long getServerIdleTimeout() { + public long serverIdleTimeout() { return serverIdleTimeout; } + + /** + * Returns cluster node. + * + * @return Cluster node. + */ + public ClusterNode clusterNode() { + return clusterNode; + } } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java index 6effbb7d4..60d46d6b4 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java @@ -49,6 +49,8 @@ import org.apache.ignite.internal.client.proto.ClientOp; import org.apache.ignite.internal.client.proto.ProtocolVersion; import org.apache.ignite.internal.client.proto.ServerMessageType; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.NetworkAddress; import org.jetbrains.annotations.Nullable; /** @@ -368,17 +370,6 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon write(req).syncUninterruptibly(); } - /** - * Returns protocol context for a version. - * - * @param ver Protocol version. - * @param serverIdleTimeout Server idle timeout. - * @return Protocol context for a version. - */ - private ProtocolContext protocolContextFromVersion(ProtocolVersion ver, long serverIdleTimeout) { - return new ProtocolContext(ver, ProtocolBitmaskFeature.allFeaturesAsEnumSet(), serverIdleTimeout); - } - /** Receive and handle handshake response. */ private void handshakeRes(ClientMessageUnpacker unpacker, ProtocolVersion proposedVer) throws IgniteClientConnectionException, IgniteClientAuthenticationException { @@ -394,7 +385,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon if (errCode == ClientErrorCode.AUTH_FAILED) { throw new IgniteClientAuthenticationException(msg); } else if (proposedVer.equals(srvVer)) { - throw new IgniteClientException("Client protocol error: unexpected server response."); + throw new IgniteClientException("Client protocol error: unexpected server response '" + msg + "'."); } else if (!supportedVers.contains(srvVer)) { throw new IgniteClientException(String.format( "Protocol version mismatch: client %s / server %s. Server details: %s", @@ -409,15 +400,19 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon throw new IgniteClientConnectionException(msg); } + var serverIdleTimeout = unpacker.unpackLong(); + var clusterNodeId = unpacker.unpackString(); + var clusterNodeName = unpacker.unpackString(); + var addr = sock.remoteAddress(); + var clusterNode = new ClusterNode(clusterNodeId, clusterNodeName, new NetworkAddress(addr.getHostName(), addr.getPort())); + var featuresLen = unpacker.unpackBinaryHeader(); unpacker.skipValues(featuresLen); var extensionsLen = unpacker.unpackMapHeader(); unpacker.skipValues(extensionsLen); - var serverIdleTimeout = unpacker.unpackLong(); - - protocolCtx = protocolContextFromVersion(srvVer, serverIdleTimeout); + protocolCtx = new ProtocolContext(srvVer, ProtocolBitmaskFeature.allFeaturesAsEnumSet(), serverIdleTimeout, clusterNode); } } @@ -453,7 +448,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon * @return Resolved interval. */ private long getHeartbeatInterval(long configuredInterval) { - long serverIdleTimeoutMs = protocolCtx.getServerIdleTimeout(); + long serverIdleTimeoutMs = protocolCtx.serverIdleTimeout(); if (serverIdleTimeoutMs <= 0) { return configuredInterval; diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java index 5aaa95d07..b5fb1df8d 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java @@ -131,11 +131,7 @@ public class ClientCompute implements IgniteCompute { private <R> CompletableFuture<R> executeOnOneNode(ClusterNode node, String jobClassName, Object[] args) { return ch.serviceAsync(ClientOp.COMPUTE_EXECUTE, w -> { // TODO: Cluster awareness (IGNITE-16771): if the specified node matches existing connection, send nil. - w.out().packString(node.id()); w.out().packString(node.name()); - w.out().packString(node.address().host()); - w.out().packInt(node.address().port()); - w.out().packString(jobClassName); w.out().packObjectArray(args); }, r -> (R) r.in().unpackObjectWithType()); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java index 42b80b9ad..70bc4115b 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.client.io; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; +import java.net.InetSocketAddress; import org.apache.ignite.lang.IgniteException; /** @@ -40,6 +41,13 @@ public interface ClientConnection extends AutoCloseable { */ ByteBuf getBuffer(); + /** + * Gets the remote address. + * + * @return Remote address. + */ + InetSocketAddress remoteAddress(); + /** * Closes the connection. */ diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java index a0541cbad..a746517b1 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java @@ -22,6 +22,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.util.AttributeKey; import java.io.IOException; +import java.net.InetSocketAddress; import org.apache.ignite.internal.client.io.ClientConnection; import org.apache.ignite.internal.client.io.ClientConnectionStateHandler; import org.apache.ignite.internal.client.io.ClientMessageHandler; @@ -71,6 +72,12 @@ public class NettyClientConnection implements ClientConnection { return channel.alloc().buffer(); } + /** {@inheritDoc} */ + @Override + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) channel.remoteAddress(); + } + /** {@inheritDoc} */ @Override public void close() { diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java index 79e85c8f9..d4d2e662d 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java @@ -17,6 +17,7 @@ package org.apache.ignite.client; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import io.netty.bootstrap.ServerBootstrap; @@ -43,6 +44,7 @@ import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NettyBootstrapFactory; import org.jetbrains.annotations.Nullable; +import org.mockito.Mockito; /** * Client handler module for tests. @@ -139,6 +141,10 @@ public class TestClientHandlerModule implements IgniteComponent { ServerBootstrap bootstrap = bootstrapFactory.createServerBootstrap(); + ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS); + Mockito.when(clusterService.topologyService().localMember().id()).thenReturn("id"); + Mockito.when(clusterService.topologyService().localMember().name()).thenReturn("consistent-id"); + bootstrap.childHandler(new ChannelInitializer<>() { @Override protected void initChannel(Channel ch) { @@ -151,7 +157,7 @@ public class TestClientHandlerModule implements IgniteComponent { mock(QueryProcessor.class), configuration, mock(IgniteCompute.class), - mock(ClusterService.class))); + clusterService)); } }) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectTimeout()); diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java index 7fa63f240..7f8b8f791 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java @@ -18,6 +18,7 @@ package org.apache.ignite.client; import static org.apache.ignite.configuration.annotation.ConfigurationType.LOCAL; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import java.net.InetSocketAddress; @@ -37,6 +38,7 @@ import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NettyBootstrapFactory; +import org.mockito.Mockito; /** * Test server. @@ -98,6 +100,10 @@ public class TestServer implements AutoCloseable { bootstrapFactory.start(); + ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS); + Mockito.when(clusterService.topologyService().localMember().id()).thenReturn("id"); + Mockito.when(clusterService.topologyService().localMember().name()).thenReturn("consistent-id"); + module = shouldDropConnection != null ? new TestClientHandlerModule(ignite, cfg, bootstrapFactory, shouldDropConnection) : new ClientHandlerModule( @@ -106,7 +112,7 @@ public class TestServer implements AutoCloseable { ignite.transactions(), cfg, mock(IgniteCompute.class), - mock(ClusterService.class), + clusterService, bootstrapFactory ); diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset b/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset index 4015c5020..a95763d3d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset +++ b/modules/platforms/dotnet/Apache.Ignite.Tests.ruleset @@ -52,5 +52,8 @@ <!-- UnusedParameters --> <Rule Id="CA1801" Action="None" /> + + <!-- Validate parameters. --> + <Rule Id="CA1062" Action="None" /> </Rules> </RuleSet> diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs new file mode 100644 index 000000000..8c798f7bc --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Tests.Compute +{ + using System.Linq; + using System.Threading.Tasks; + using Internal.Proto; + using NUnit.Framework; + + /// <summary> + /// Tests compute cluster awareness: client requests can be sent to correct server nodes when a direct connection is available. + /// </summary> + public class ComputeClusterAwarenessTests + { + [Test] + public async Task TestClientSendsComputeJobToTargetNodeWhenDirectConnectionExists() + { + using var server1 = new FakeServer(nodeName: "s1"); + using var server2 = new FakeServer(nodeName: "s2"); + using var server3 = new FakeServer(nodeName: "s3"); + + var clientCfg = new IgniteClientConfiguration + { + Endpoints = { server1.Node.Address.ToString(), server2.Node.Address.ToString(), server3.Node.Address.ToString() } + }; + + using var client = await IgniteClient.StartAsync(clientCfg); + + // ReSharper disable once AccessToDisposedClosure + TestUtils.WaitForCondition(() => client.GetConnections().Count == 3); + + var res2 = await client.Compute.ExecuteAsync<string>(nodes: new[] { server2.Node }, jobClassName: string.Empty); + var res3 = await client.Compute.ExecuteAsync<string>(nodes: new[] { server3.Node }, jobClassName: string.Empty); + + Assert.AreEqual("s2", res2); + Assert.AreEqual("s3", res3); + + Assert.AreEqual(ClientOp.ComputeExecute, server2.ClientOps.Single()); + Assert.AreEqual(ClientOp.ComputeExecute, server3.ClientOps.Single()); + + Assert.IsEmpty(server1.ClientOps); + } + + [Test] + public async Task TestClientSendsComputeJobToDefaultNodeWhenDirectConnectionToTargetDoesNotExist() + { + using var server1 = new FakeServer(nodeName: "s1"); + using var server2 = new FakeServer(nodeName: "s2"); + using var server3 = new FakeServer(nodeName: "s3"); + + using var client = await server1.ConnectClientAsync(); + + var res2 = await client.Compute.ExecuteAsync<string>(nodes: new[] { server2.Node }, jobClassName: string.Empty); + var res3 = await client.Compute.ExecuteAsync<string>(nodes: new[] { server3.Node }, jobClassName: string.Empty); + + Assert.AreEqual("s1", res2); + Assert.AreEqual("s1", res3); + Assert.AreEqual(new[] { ClientOp.ComputeExecute, ClientOp.ComputeExecute }, server1.ClientOps); + + Assert.IsEmpty(server2.ClientOps); + Assert.IsEmpty(server3.ClientOps); + + Assert.AreEqual(server1.Node, client.GetConnections().Single()); + } + + [Test] + public async Task TestClientRetriesComputeJobOnPrimaryAndDefaultNodes() + { + using var server1 = new FakeServer(shouldDropConnection: cnt => cnt % 2 == 0, nodeName: "s1"); + using var server2 = new FakeServer(shouldDropConnection: cnt => cnt % 2 == 0, nodeName: "s2"); + + var clientCfg = new IgniteClientConfiguration + { + Endpoints = { server1.Node.Address.ToString(), server2.Node.Address.ToString() }, + RetryPolicy = new RetryLimitPolicy { RetryLimit = 2 } + }; + + using var client = await IgniteClient.StartAsync(clientCfg); + + // ReSharper disable once AccessToDisposedClosure + TestUtils.WaitForCondition(() => client.GetConnections().Count == 2); + + for (int i = 0; i < 100; i++) + { + var node = i % 2 == 0 ? server1.Node : server2.Node; + + var res = await client.Compute.ExecuteAsync<string>(nodes: new[] { node }, jobClassName: string.Empty); + + Assert.AreEqual(node.Name, res); + } + } + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs index 843e70ebd..35b1d38f5 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs @@ -23,6 +23,7 @@ namespace Apache.Ignite.Tests.Compute using System.Net; using System.Threading.Tasks; using Ignite.Compute; + using Internal.Network; using Network; using NUnit.Framework; @@ -130,6 +131,17 @@ namespace Apache.Ignite.Tests.Compute Assert.AreEqual("class org.apache.ignite.tx.TransactionException: Custom job error", ex!.Message); } + [Test] + public void TestUnknownNodeThrows() + { + var unknownNode = new ClusterNode("x", "y", new IPEndPoint(IPAddress.Loopback, 0)); + + var ex = Assert.ThrowsAsync<IgniteClientException>(async () => + await Client.Compute.ExecuteAsync<string>(new[] { unknownNode }, EchoJob, "unused")); + + Assert.AreEqual("Specified node is not present in the cluster: y", ex!.Message); + } + // TODO: Support all types (IGNITE-15431). [Test] public async Task TestAllSupportedArgTypes() diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs index 8df6aa6e3..54e57b7c8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs @@ -19,12 +19,17 @@ namespace Apache.Ignite.Tests { using System; using System.Buffers; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; + using Internal.Network; using Internal.Proto; using MessagePack; + using Network; /// <summary> /// Fake Ignite server for test purposes. @@ -41,7 +46,9 @@ namespace Apache.Ignite.Tests private readonly Func<int, bool> _shouldDropConnection; - public FakeServer(Func<int, bool>? shouldDropConnection = null) + private readonly ConcurrentQueue<ClientOp> _ops = new(); + + public FakeServer(Func<int, bool>? shouldDropConnection = null, string nodeName = "fake-server") { _shouldDropConnection = shouldDropConnection ?? (_ => false); _listener = new Socket(IPAddress.Loopback.AddressFamily, SocketType.Stream, ProtocolType.Tcp); @@ -49,9 +56,15 @@ namespace Apache.Ignite.Tests _listener.Bind(new IPEndPoint(IPAddress.Loopback, 0)); _listener.Listen(backlog: 1); + Node = new ClusterNode("id-" + nodeName, nodeName, (IPEndPoint)_listener.LocalEndPoint); + Task.Run(ListenLoop); } + public IClusterNode Node { get; } + + internal IList<ClientOp> ClientOps => _ops.ToList(); + public async Task<IIgniteClient> ConnectClientAsync(IgniteClientConfiguration? cfg = null) { var port = ((IPEndPoint)_listener.LocalEndPoint).Port; @@ -110,8 +123,20 @@ namespace Apache.Ignite.Tests // Write handshake response. handler.Send(ProtoCommon.MagicBytes); - handler.Send(new byte[] { 0, 0, 0, 8 }); // Size. - handler.Send(new byte[] { 3, 0, 0, 0, 196, 0, 128, 0 }); + + var handshakeBufferWriter = new ArrayBufferWriter<byte>(); + var handshakeWriter = new MessagePackWriter(handshakeBufferWriter); + handshakeWriter.Write(0); // Idle timeout. + handshakeWriter.Write(Node.Id); // Node id. + handshakeWriter.Write(Node.Name); // Node name (consistent id). + handshakeWriter.WriteBinHeader(0); // Features. + handshakeWriter.WriteMapHeader(0); // Extensions. + handshakeWriter.Flush(); + + handler.Send(new byte[] { 0, 0, 0, (byte)(4 + handshakeBufferWriter.WrittenCount) }); // Size. + handler.Send(new byte[] { 3, 0, 0, 0 }); // Version and success flag. + + handler.Send(handshakeBufferWriter.WrittenSpan); while (!_cts.IsCancellationRequested) { @@ -127,6 +152,8 @@ namespace Apache.Ignite.Tests var opCode = (ClientOp)msg[0]; var requestId = msg[1]; + _ops.Enqueue(opCode); + if (opCode == ClientOp.TablesGet) { handler.Send(new byte[] { 0, 0, 0, 4 }); // Size. @@ -189,6 +216,20 @@ namespace Apache.Ignite.Tests continue; } + if (opCode == ClientOp.ComputeExecute) + { + var arrayBufferWriter = new ArrayBufferWriter<byte>(); + var writer = new MessagePackWriter(arrayBufferWriter); + writer.Write(Node.Name); + writer.Flush(); + + handler.Send(new byte[] { 0, 0, 0, (byte)(4 + arrayBufferWriter.WrittenCount) }); // Size. + handler.Send(new byte[] { 0, requestId, 0, (byte)ClientDataType.String }); + handler.Send(arrayBufferWriter.WrittenSpan); + + continue; + } + // Fake error message for any other op code. handler.Send(new byte[] { 0, 0, 0, 8 }); // Size. handler.Send(new byte[] { 0, requestId, 1, 160 | 4, (byte)Err[0], (byte)Err[1], (byte)Err[2], (byte)Err[3] }); diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs index 222d55a81..e819b162b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs @@ -83,7 +83,7 @@ namespace Apache.Ignite.Tests var str = Encoding.UTF8.GetString(msg); - Assert.AreEqual(10, msgSize, str); + Assert.AreEqual(110, msgSize, str); // Protocol version. Assert.AreEqual(3, msg[0]); diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs index 5a49866ab..fa35d387a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs @@ -18,9 +18,12 @@ namespace Apache.Ignite.Tests { using System; + using System.Diagnostics; using System.IO; using System.Reflection; using System.Runtime.InteropServices; + using System.Threading; + using NUnit.Framework; public static class TestUtils { @@ -30,6 +33,28 @@ namespace Apache.Ignite.Tests public static bool IsWindows => RuntimeInformation.IsOSPlatform(OSPlatform.Windows); + public static void WaitForCondition(Func<bool> condition, int timeoutMs = 1000) + { + if (condition()) + { + return; + } + + var sw = Stopwatch.StartNew(); + + while (sw.ElapsedMilliseconds < timeoutMs) + { + if (condition()) + { + return; + } + + Thread.Sleep(50); + } + + Assert.Fail("Condition not reached after " + sw.Elapsed); + } + private static string GetSolutionDir() { var dir = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); diff --git a/modules/platforms/dotnet/Apache.Ignite/IIgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite/IIgniteClient.cs index 83b302083..2ef60afff 100644 --- a/modules/platforms/dotnet/Apache.Ignite/IIgniteClient.cs +++ b/modules/platforms/dotnet/Apache.Ignite/IIgniteClient.cs @@ -18,6 +18,8 @@ namespace Apache.Ignite { using System; + using System.Collections.Generic; + using Network; /// <summary> /// Ignite client. @@ -30,5 +32,11 @@ namespace Apache.Ignite /// Gets the configuration. /// </summary> IgniteClientConfiguration Configuration { get; } + + /// <summary> + /// Gets active connections. + /// </summary> + /// <returns>A list of connected cluster nodes.</returns> + IList<IClusterNode> GetConnections(); } } diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs index 5a4819146..80f642c8e 100644 --- a/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs +++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClient.cs @@ -35,9 +35,7 @@ namespace Apache.Ignite { IgniteArgumentCheck.NotNull(configuration, nameof(configuration)); - var socket = new ClientFailoverSocket(configuration); - - await socket.ConnectAsync().ConfigureAwait(false); + var socket = await ClientFailoverSocket.ConnectAsync(configuration).ConfigureAwait(false); return new IgniteClientInternal(socket); } diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs index fa5d2d16a..6519e9f80 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs @@ -18,6 +18,7 @@ namespace Apache.Ignite.Internal { using System; + using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; @@ -42,7 +43,10 @@ namespace Apache.Ignite.Internal private readonly IIgniteLogger? _logger; /** Endpoints with corresponding hosts - from configuration. */ - private readonly IReadOnlyList<SocketEndpoint> _endPoints; + private readonly IReadOnlyList<SocketEndpoint> _endpoints; + + /** Cluster node unique name to endpoint map. */ + private readonly ConcurrentDictionary<string, SocketEndpoint> _endpointsMap = new(); /** <see cref="_socket"/> lock. */ [SuppressMessage( @@ -51,7 +55,7 @@ namespace Apache.Ignite.Internal Justification = "WaitHandle is not used in SemaphoreSlim, no need to dispose.")] private readonly SemaphoreSlim _socketLock = new(1); - /** Primary socket. */ + /** Primary socket. Guarded by <see cref="_socketLock"/>. */ private ClientSocket? _socket; /** Disposed flag. */ @@ -61,7 +65,7 @@ namespace Apache.Ignite.Internal /// Initializes a new instance of the <see cref="ClientFailoverSocket"/> class. /// </summary> /// <param name="configuration">Client configuration.</param> - public ClientFailoverSocket(IgniteClientConfiguration configuration) + private ClientFailoverSocket(IgniteClientConfiguration configuration) { if (configuration.Endpoints.Count == 0) { @@ -71,7 +75,7 @@ namespace Apache.Ignite.Internal } _logger = configuration.Logger.GetLogger(GetType()); - _endPoints = GetIpEndPoints(configuration).ToList(); + _endpoints = GetIpEndPoints(configuration).ToList(); Configuration = new(configuration); // Defensive copy. } @@ -84,10 +88,19 @@ namespace Apache.Ignite.Internal /// <summary> /// Connects the socket. /// </summary> + /// <param name="configuration">Client configuration.</param> /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> - public async Task ConnectAsync() + public static async Task<ClientFailoverSocket> ConnectAsync(IgniteClientConfiguration configuration) { - await GetSocketAsync().ConfigureAwait(false); + var socket = new ClientFailoverSocket(configuration); + + await socket.GetSocketAsync().ConfigureAwait(false); + + // Because this call is not awaited, execution of the current method continues before the call is completed. + // Secondary connections are established in the background. + _ = socket.ConnectAllSockets(); + + return socket; } /// <summary> @@ -119,6 +132,59 @@ namespace Apache.Ignite.Internal } } + /// <summary> + /// Gets the endpoint by unique cluster node name. + /// </summary> + /// <param name="clusterNodeName">Cluster node name.</param> + /// <returns>Endpoint or null.</returns> + public SocketEndpoint? GetEndpoint(string clusterNodeName) + { + return _endpointsMap.TryGetValue(clusterNodeName, out var e) ? e : null; + } + + /// <summary> + /// Performs an in-out operation on the specified endpoint. + /// </summary> + /// <param name="endpoint">Endpoint.</param> + /// <param name="clientOp">Client op code.</param> + /// <param name="request">Request data.</param> + /// <returns>Response data.</returns> + public async Task<PooledBuffer?> TryDoOutInOpAsync(SocketEndpoint endpoint, ClientOp clientOp, PooledArrayBufferWriter? request) + { + try + { + var socket = endpoint.Socket; + + if (socket == null || socket.IsDisposed) + { + await _socketLock.WaitAsync().ConfigureAwait(false); + + try + { + socket = await ConnectAsync(endpoint).ConfigureAwait(false); + } + finally + { + _socketLock.Release(); + } + } + + return await socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false); + } + catch (Exception e) + { + int attempt = 0; + List<Exception>? errors = null; + + if (HandleOpError(e, clientOp, ref attempt, ref errors)) + { + return null; + } + + throw; + } + } + /// <inheritdoc/> public void Dispose() { @@ -133,7 +199,10 @@ namespace Apache.Ignite.Internal _disposed = true; - _socket?.Dispose(); + foreach (var endpoint in _endpoints) + { + endpoint.Socket?.Dispose(); + } } finally { @@ -171,6 +240,57 @@ namespace Apache.Ignite.Internal } } + /// <summary> + /// Gets active connections. + /// </summary> + /// <returns>Active connections.</returns> + public IEnumerable<ConnectionContext> GetConnections() => + _endpoints + .Select(e => e.Socket?.ConnectionContext) + .Where(ctx => ctx != null) + .ToList()!; + + [SuppressMessage( + "Microsoft.Design", + "CA1031:DoNotCatchGeneralExceptionTypes", + Justification = "Secondary connection errors can be ignored.")] + private async Task ConnectAllSockets() + { + if (_endpoints.Count == 1) + { + return; + } + + await _socketLock.WaitAsync().ConfigureAwait(false); + + var tasks = new List<Task>(_endpoints.Count); + + _logger?.Debug("Establishing secondary connections..."); + + try + { + foreach (var endpoint in _endpoints) + { + if (endpoint.Socket?.IsDisposed == false) + { + continue; + } + + tasks.Add(ConnectAsync(endpoint)); + } + + await Task.WhenAll(tasks).ConfigureAwait(false); + } + catch (Exception e) + { + _logger?.Warn("Error while trying to establish secondary connections: " + e.Message, e); + } + finally + { + _socketLock.Release(); + } + } + /// <summary> /// Throws if disposed. /// </summary> @@ -190,10 +310,10 @@ namespace Apache.Ignite.Internal List<Exception>? errors = null; var startIdx = (int) Interlocked.Increment(ref _endPointIndex); - for (var i = 0; i < _endPoints.Count; i++) + for (var i = 0; i < _endpoints.Count; i++) { - var idx = (startIdx + i) % _endPoints.Count; - var endPoint = _endPoints[idx]; + var idx = (startIdx + i) % _endpoints.Count; + var endPoint = _endpoints[idx]; if (endPoint.Socket is { IsDisposed: false }) { @@ -219,11 +339,18 @@ namespace Apache.Ignite.Internal /// <summary> /// Connects to the given endpoint. /// </summary> - private async Task<ClientSocket> ConnectAsync(SocketEndpoint endPoint) + private async Task<ClientSocket> ConnectAsync(SocketEndpoint endpoint) { - var socket = await ClientSocket.ConnectAsync(endPoint.EndPoint, Configuration).ConfigureAwait(false); + if (endpoint.Socket?.IsDisposed == false) + { + return endpoint.Socket; + } + + var socket = await ClientSocket.ConnectAsync(endpoint.EndPoint, Configuration).ConfigureAwait(false); + + endpoint.Socket = socket; - endPoint.Socket = socket; + _endpointsMap[socket.ConnectionContext.ClusterNode.Name] = endpoint; return socket; } diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs index f8366d2f6..bdee9816f 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs @@ -30,6 +30,7 @@ namespace Apache.Ignite.Internal using Buffers; using Log; using MessagePack; + using Network; using Proto; /// <summary> @@ -38,8 +39,6 @@ namespace Apache.Ignite.Internal // ReSharper disable SuggestBaseTypeForParameter (NetworkStream has more efficient read/write methods). internal sealed class ClientSocket : IDisposable { - private record ConnectionContext(ClientProtocolVersion Version, TimeSpan IdleTimeout); - /** General-purpose client type code. */ private const byte ClientType = 2; @@ -95,13 +94,14 @@ namespace Apache.Ignite.Internal /// </summary> /// <param name="stream">Network stream.</param> /// <param name="configuration">Configuration.</param> - /// <param name="context">Connection context.</param> - private ClientSocket(NetworkStream stream, IgniteClientConfiguration configuration, ConnectionContext context) + /// <param name="connectionContext">Connection context.</param> + private ClientSocket(NetworkStream stream, IgniteClientConfiguration configuration, ConnectionContext connectionContext) { _stream = stream; + ConnectionContext = connectionContext; _logger = configuration.Logger.GetLogger(GetType()); - _heartbeatInterval = GetHeartbeatInterval(configuration.HeartbeatInterval, context.IdleTimeout, _logger); + _heartbeatInterval = GetHeartbeatInterval(configuration.HeartbeatInterval, connectionContext.IdleTimeout, _logger); // ReSharper disable once AsyncVoidLambda (timer callback) _heartbeatTimer = new Timer( @@ -112,9 +112,7 @@ namespace Apache.Ignite.Internal // Because this call is not awaited, execution of the current method continues before the call is completed. // Receive loop runs in the background and should not be awaited. -#pragma warning disable 4014 - RunReceiveLoop(_disposeTokenSource.Token); -#pragma warning restore 4014 + _ = RunReceiveLoop(_disposeTokenSource.Token); } /// <summary> @@ -122,6 +120,11 @@ namespace Apache.Ignite.Internal /// </summary> public bool IsDisposed => _disposeTokenSource.IsCancellationRequested; + /// <summary> + /// Gets the connection context. + /// </summary> + public ConnectionContext ConnectionContext { get; } + /// <summary> /// Connects the socket to the specified endpoint and performs handshake. /// </summary> @@ -132,7 +135,7 @@ namespace Apache.Ignite.Internal "Microsoft.Reliability", "CA2000:Dispose objects before losing scope", Justification = "NetworkStream is returned from this method in the socket.")] - public static async Task<ClientSocket> ConnectAsync(EndPoint endPoint, IgniteClientConfiguration configuration) + public static async Task<ClientSocket> ConnectAsync(IPEndPoint endPoint, IgniteClientConfiguration configuration) { var socket = new Socket(SocketType.Stream, ProtocolType.Tcp) { @@ -148,7 +151,7 @@ namespace Apache.Ignite.Internal var stream = new NetworkStream(socket, ownsSocket: true); - var context = await HandshakeAsync(stream).ConfigureAwait(false); + var context = await HandshakeAsync(stream, endPoint).ConfigureAwait(false); logger?.Debug($"Handshake succeeded. Server protocol version: {context.Version}, idle timeout: {context.IdleTimeout}"); return new ClientSocket(stream, configuration, context); @@ -222,7 +225,8 @@ namespace Apache.Ignite.Internal /// Performs the handshake exchange. /// </summary> /// <param name="stream">Network stream.</param> - private static async Task<ConnectionContext> HandshakeAsync(NetworkStream stream) + /// <param name="endPoint">Endpoint.</param> + private static async Task<ConnectionContext> HandshakeAsync(NetworkStream stream, IPEndPoint endPoint) { await stream.WriteAsync(ProtoCommon.MagicBytes).ConfigureAwait(false); await WriteHandshakeAsync(stream, CurrentProtocolVersion).ConfigureAwait(false); @@ -232,7 +236,7 @@ namespace Apache.Ignite.Internal await CheckMagicBytesAsync(stream).ConfigureAwait(false); using var response = await ReadResponseAsync(stream, new byte[4], CancellationToken.None).ConfigureAwait(false); - return ReadHandshakeResponse(response.GetReader()); + return ReadHandshakeResponse(response.GetReader(), endPoint); } private static async ValueTask CheckMagicBytesAsync(NetworkStream stream) @@ -258,7 +262,7 @@ namespace Apache.Ignite.Internal } } - private static ConnectionContext ReadHandshakeResponse(MessagePackReader reader) + private static ConnectionContext ReadHandshakeResponse(MessagePackReader reader, IPEndPoint endPoint) { var serverVer = new ClientProtocolVersion(reader.ReadInt16(), reader.ReadInt16(), reader.ReadInt16()); @@ -274,12 +278,17 @@ namespace Apache.Ignite.Internal throw exception; } + var idleTimeoutMs = reader.ReadInt64(); + var clusterNodeId = reader.ReadString(); + var clusterNodeName = reader.ReadString(); + reader.Skip(); // Features. reader.Skip(); // Extensions. - var idleTimeoutMs = reader.ReadInt64(); - - return new ConnectionContext(serverVer, TimeSpan.FromMilliseconds(idleTimeoutMs)); + return new ConnectionContext( + serverVer, + TimeSpan.FromMilliseconds(idleTimeoutMs), + new ClusterNode(clusterNodeId, clusterNodeName, endPoint)); } private static IgniteClientException? ReadError(ref MessagePackReader reader) diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs index f80de5b80..348c05c4d 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs @@ -89,21 +89,42 @@ namespace Apache.Ignite.Internal.Compute { IgniteArgumentCheck.NotNull(node, nameof(node)); - using var writer = new PooledArrayBufferWriter(); - Write(); + // Try direct connection to the specified node. + if (_socket.GetEndpoint(node.Name) is { } endpoint) + { + using var writerWithoutNode = new PooledArrayBufferWriter(); + Write(writerWithoutNode, writeNode: false); + + using var res1 = await _socket.TryDoOutInOpAsync(endpoint, ClientOp.ComputeExecute, writerWithoutNode) + .ConfigureAwait(false); + + // Result is null when there was a connection issue, but retry policy allows another try. + if (res1 != null) + { + return Read(res1.Value); + } + } + + // When direct connection is not available, use default connection and pass target node info to the server. + using var writerWithNode = new PooledArrayBufferWriter(); + Write(writerWithNode, writeNode: true); - using var resBuf = await _socket.DoOutInOpAsync(ClientOp.ComputeExecute, writer).ConfigureAwait(false); + using var res2 = await _socket.DoOutInOpAsync(ClientOp.ComputeExecute, writerWithNode).ConfigureAwait(false); - return Read(); + return Read(res2); - void Write() + void Write(PooledArrayBufferWriter writer, bool writeNode) { var w = writer.GetMessageWriter(); - w.Write(node.Id); - w.Write(node.Name); - w.Write(node.Address.Address.ToString()); - w.Write(node.Address.Port); + if (writeNode) + { + w.Write(node.Name); + } + else + { + w.WriteNil(); + } w.Write(jobClassName); w.WriteObjectArrayWithTypes(args); @@ -111,9 +132,9 @@ namespace Apache.Ignite.Internal.Compute w.Flush(); } - T Read() + static T Read(in PooledBuffer buf) { - var reader = resBuf.GetReader(); + var reader = buf.GetReader(); return (T)reader.ReadObjectWithType()!; } diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs similarity index 64% copy from modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs copy to modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs index 54c0df714..505eacb2c 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ConnectionContext.cs @@ -15,22 +15,19 @@ * limitations under the License. */ -namespace Apache.Ignite.Internal.Table -{ - using System.Collections.Generic; - - // XMLDoc check fails on older SDKs: https://github.com/dotnet/roslyn/issues/44571. +// XMLDoc check fails on older SDKs: https://github.com/dotnet/roslyn/issues/44571. #pragma warning disable CS1572 #pragma warning disable CS1573 +namespace Apache.Ignite.Internal +{ + using System; + using Ignite.Network; /// <summary> - /// Schema. + /// Socket connection context. /// </summary> - /// <param name="Version">Version.</param> - /// <param name="KeyColumnCount">Key column count.</param> - /// <param name="Columns">Columns in schema order.</param> - internal record Schema( - int Version, - int KeyColumnCount, - IReadOnlyList<Column> Columns); + /// <param name="Version">Protocol version.</param> + /// <param name="IdleTimeout">Server idle timeout.</param> + /// <param name="ClusterNode">Cluster node.</param> + internal record ConnectionContext(ClientProtocolVersion Version, TimeSpan IdleTimeout, IClusterNode ClusterNode); } diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs index 22fd69abf..20dededa1 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs @@ -18,6 +18,7 @@ namespace Apache.Ignite.Internal { using System.Collections.Generic; + using System.Linq; using System.Net; using System.Threading.Tasks; using Ignite.Compute; @@ -86,6 +87,10 @@ namespace Apache.Ignite.Internal } } + /// <inheritdoc/> + public IList<IClusterNode> GetConnections() => + _socket.GetConnections().Select(ctx => ctx.ClusterNode).ToList(); + /// <inheritdoc/> public void Dispose() { diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs index 54c0df714..836e32cd9 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs @@ -15,14 +15,13 @@ * limitations under the License. */ +// XMLDoc check fails on older SDKs: https://github.com/dotnet/roslyn/issues/44571. +#pragma warning disable CS1572 +#pragma warning disable CS1573 namespace Apache.Ignite.Internal.Table { using System.Collections.Generic; - // XMLDoc check fails on older SDKs: https://github.com/dotnet/roslyn/issues/44571. -#pragma warning disable CS1572 -#pragma warning disable CS1573 - /// <summary> /// Schema. /// </summary>