Repository: ignite Updated Branches: refs/heads/ignite-1.6 b50eb654c -> 003fe5e35
IGNITE-3108: .NET: Added communication SPI stubs. This closes #702. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/003fe5e3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/003fe5e3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/003fe5e3 Branch: refs/heads/ignite-1.6 Commit: 003fe5e35aaf85d8ed5696f36eff5618f916e166 Parents: b50eb65 Author: Pavel Tupitsyn <[email protected]> Authored: Wed May 11 18:26:08 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed May 11 18:26:08 2016 +0300 ---------------------------------------------------------------------- .../utils/PlatformConfigurationUtils.java | 54 ++++ .../IgniteConfigurationSerializerTest.cs | 27 ++ .../IgniteConfigurationTest.cs | 44 ++- .../Apache.Ignite.Core.csproj | 2 + .../Communication/ICommunicationSpi.cs | 37 +++ .../Communication/Tcp/TcpCommunicationSpi.cs | 283 +++++++++++++++++++ .../Apache.Ignite.Core/IgniteConfiguration.cs | 28 ++ .../IgniteConfigurationSection.xsd | 22 ++ 8 files changed, 496 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index 5ee19c1..30e45ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -42,6 +42,9 @@ import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryConfiguration; import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration; import org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactoryNative; import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration; +import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiMBean; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -351,6 +354,30 @@ public class PlatformConfigurationUtils { readDiscoveryConfiguration(in, cfg); if (in.readBoolean()) { + TcpCommunicationSpi comm = new TcpCommunicationSpi(); + + comm.setAckSendThreshold(in.readInt()); + comm.setConnectTimeout(in.readLong()); + comm.setDirectBuffer(in.readBoolean()); + comm.setDirectSendBuffer(in.readBoolean()); + comm.setIdleConnectionTimeout(in.readLong()); + comm.setLocalAddress(in.readString()); + comm.setLocalPort(in.readInt()); + comm.setLocalPortRange(in.readInt()); + comm.setMaxConnectTimeout(in.readLong()); + comm.setMessageQueueLimit(in.readInt()); + comm.setReconnectCount(in.readInt()); + comm.setSelectorsCount(in.readInt()); + comm.setSlowClientQueueLimit(in.readInt()); + comm.setSocketReceiveBuffer(in.readInt()); + comm.setSocketSendBuffer(in.readInt()); + comm.setTcpNoDelay(in.readBoolean()); + comm.setUnacknowledgedMessagesBufferSize(in.readInt()); + + cfg.setCommunicationSpi(comm); + } + + if (in.readBoolean()) { if (cfg.getBinaryConfiguration() == null) cfg.setBinaryConfiguration(new BinaryConfiguration()); @@ -687,6 +714,33 @@ public class PlatformConfigurationUtils { writeDiscoveryConfiguration(w, cfg.getDiscoverySpi()); + CommunicationSpi comm = cfg.getCommunicationSpi(); + + if (comm instanceof TcpCommunicationSpi) { + w.writeBoolean(true); + TcpCommunicationSpiMBean tcp = (TcpCommunicationSpiMBean) comm; + + w.writeInt(tcp.getAckSendThreshold()); + w.writeLong(tcp.getConnectTimeout()); + w.writeBoolean(tcp.isDirectBuffer()); + w.writeBoolean(tcp.isDirectSendBuffer()); + w.writeLong(tcp.getIdleConnectionTimeout()); + w.writeString(tcp.getLocalAddress()); + w.writeInt(tcp.getLocalPort()); + w.writeInt(tcp.getLocalPortRange()); + w.writeLong(tcp.getMaxConnectTimeout()); + w.writeInt(tcp.getMessageQueueLimit()); + w.writeInt(tcp.getReconnectCount()); + w.writeInt(tcp.getSelectorsCount()); + w.writeInt(tcp.getSlowClientQueueLimit()); + w.writeInt(tcp.getSocketReceiveBuffer()); + w.writeInt(tcp.getSocketSendBuffer()); + w.writeBoolean(tcp.isTcpNoDelay()); + w.writeInt(tcp.getUnacknowledgedMessagesBufferSize()); + } + else + w.writeBoolean(false); + BinaryConfiguration bc = cfg.getBinaryConfiguration(); w.writeBoolean(bc != null); http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs index 36bda2e..e3507b8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs @@ -33,6 +33,7 @@ namespace Apache.Ignite.Core.Tests using Apache.Ignite.Core.Cache.Configuration; using Apache.Ignite.Core.Cache.Store; using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Communication.Tcp; using Apache.Ignite.Core.DataStructures.Configuration; using Apache.Ignite.Core.Discovery.Tcp; using Apache.Ignite.Core.Discovery.Tcp.Multicast; @@ -65,6 +66,7 @@ namespace Apache.Ignite.Core.Tests <discoverySpi type='TcpDiscoverySpi' joinTimeout='0:1:0'> <ipFinder type='TcpDiscoveryMulticastIpFinder' addressRequestAttempts='7' /> </discoverySpi> + <communicationSpi type='TcpCommunicationSpi' ackSendThreshold='33' idleConnectionTimeout='0:1:2' /> <jvmOptions><string>-Xms1g</string><string>-Xmx4g</string></jvmOptions> <lifecycleBeans> <iLifecycleBean type='Apache.Ignite.Core.Tests.IgniteConfigurationSerializerTest+LifecycleBean, Apache.Ignite.Core.Tests' foo='15' /> @@ -154,6 +156,11 @@ namespace Apache.Ignite.Core.Tests Assert.AreEqual(new TimeSpan(0,1,2), tx.DefaultTimeout); Assert.AreEqual(15, tx.PessimisticTransactionLogSize); Assert.AreEqual(TimeSpan.FromSeconds(33), tx.PessimisticTransactionLogLinger); + + var comm = cfg.CommunicationSpi as TcpCommunicationSpi; + Assert.IsNotNull(comm); + Assert.AreEqual(33, comm.AckSendThreshold); + Assert.AreEqual(new TimeSpan(0, 1, 2), comm.IdleConnectionTimeout); } /// <summary> @@ -445,6 +452,26 @@ namespace Apache.Ignite.Core.Tests DefaultTimeout = TimeSpan.FromDays(2), DefaultTransactionConcurrency = TransactionConcurrency.Optimistic, PessimisticTransactionLogLinger = TimeSpan.FromHours(3) + }, + CommunicationSpi = new TcpCommunicationSpi + { + LocalPort = 47501, + MaxConnectTimeout = TimeSpan.FromSeconds(34), + MessageQueueLimit = 15, + ConnectTimeout = TimeSpan.FromSeconds(17), + IdleConnectionTimeout = TimeSpan.FromSeconds(19), + SelectorsCount = 8, + ReconnectCount = 33, + SocketReceiveBufferSize = 512, + AckSendThreshold = 99, + DirectBuffer = false, + DirectSendBuffer = true, + LocalPortRange = 45, + LocalAddress = "127.0.0.1", + TcpNoDelay = false, + SlowClientQueueLimit = 98, + SocketSendBufferSize = 2045, + UnacknowledgedMessagesBufferSize = 3450 } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs index 45b9a05..3e5e877 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs @@ -25,6 +25,7 @@ namespace Apache.Ignite.Core.Tests using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache.Configuration; using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Communication.Tcp; using Apache.Ignite.Core.DataStructures.Configuration; using Apache.Ignite.Core.Discovery.Tcp; using Apache.Ignite.Core.Discovery.Tcp.Multicast; @@ -67,6 +68,7 @@ namespace Apache.Ignite.Core.Tests CheckDefaultValueAttributes(new TcpDiscoverySpi()); CheckDefaultValueAttributes(new CacheConfiguration()); CheckDefaultValueAttributes(new TcpDiscoveryMulticastIpFinder()); + CheckDefaultValueAttributes(new TcpCommunicationSpi()); } /// <summary> @@ -127,6 +129,26 @@ namespace Apache.Ignite.Core.Tests Assert.AreEqual(tx.DefaultTransactionIsolation, resTx.DefaultTransactionIsolation); Assert.AreEqual(tx.PessimisticTransactionLogLinger, resTx.PessimisticTransactionLogLinger); Assert.AreEqual(tx.PessimisticTransactionLogSize, resTx.PessimisticTransactionLogSize); + + var com = (TcpCommunicationSpi) cfg.CommunicationSpi; + var resCom = (TcpCommunicationSpi) resCfg.CommunicationSpi; + Assert.AreEqual(com.AckSendThreshold, resCom.AckSendThreshold); + Assert.AreEqual(com.ConnectTimeout, resCom.ConnectTimeout); + Assert.AreEqual(com.DirectBuffer, resCom.DirectBuffer); + Assert.AreEqual(com.DirectSendBuffer, resCom.DirectSendBuffer); + Assert.AreEqual(com.IdleConnectionTimeout, resCom.IdleConnectionTimeout); + Assert.AreEqual(com.LocalAddress, resCom.LocalAddress); + Assert.AreEqual(com.LocalPort, resCom.LocalPort); + Assert.AreEqual(com.LocalPortRange, resCom.LocalPortRange); + Assert.AreEqual(com.MaxConnectTimeout, resCom.MaxConnectTimeout); + Assert.AreEqual(com.MessageQueueLimit, resCom.MessageQueueLimit); + Assert.AreEqual(com.ReconnectCount, resCom.ReconnectCount); + Assert.AreEqual(com.SelectorsCount, resCom.SelectorsCount); + Assert.AreEqual(com.SlowClientQueueLimit, resCom.SlowClientQueueLimit); + Assert.AreEqual(com.SocketReceiveBufferSize, resCom.SocketReceiveBufferSize); + Assert.AreEqual(com.SocketSendBufferSize, resCom.SocketSendBufferSize); + Assert.AreEqual(com.TcpNoDelay, resCom.TcpNoDelay); + Assert.AreEqual(com.UnacknowledgedMessagesBufferSize, resCom.UnacknowledgedMessagesBufferSize); } } @@ -323,7 +345,7 @@ namespace Apache.Ignite.Core.Tests { var props = obj.GetType().GetProperties(); - foreach (var prop in props) + foreach (var prop in props.Where(p => p.Name != "SelectorsCount")) { var attr = prop.GetCustomAttributes(true).OfType<DefaultValueAttribute>().FirstOrDefault(); var propValue = prop.GetValue(obj, null); @@ -385,6 +407,26 @@ namespace Apache.Ignite.Core.Tests DefaultTransactionIsolation = TransactionIsolation.Serializable, PessimisticTransactionLogLinger = TimeSpan.FromHours(1), PessimisticTransactionLogSize = 240 + }, + CommunicationSpi = new TcpCommunicationSpi + { + LocalPort = 47501, + MaxConnectTimeout = TimeSpan.FromSeconds(34), + MessageQueueLimit = 15, + ConnectTimeout = TimeSpan.FromSeconds(17), + IdleConnectionTimeout = TimeSpan.FromSeconds(19), + SelectorsCount = 8, + ReconnectCount = 33, + SocketReceiveBufferSize = 512, + AckSendThreshold = 99, + DirectBuffer = false, + DirectSendBuffer = true, + LocalPortRange = 45, + LocalAddress = "127.0.0.1", + TcpNoDelay = false, + SlowClientQueueLimit = 98, + SocketSendBufferSize = 2045, + UnacknowledgedMessagesBufferSize = 3450 } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 409a7cf..8943030 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -93,6 +93,8 @@ <Compile Include="Cache\CachePartialUpdateException.cs" /> <Compile Include="Cache\CachePeekMode.cs" /> <Compile Include="Cache\Configuration\NearCacheConfiguration.cs" /> + <Compile Include="Communication\ICommunicationSpi.cs" /> + <Compile Include="Communication\Tcp\TcpCommunicationSpi.cs" /> <Compile Include="DataStructures\Configuration\AtomicConfiguration.cs" /> <Compile Include="Cache\Configuration\QueryAlias.cs" /> <Compile Include="Cache\Configuration\QueryTextFieldAttribute.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core/Communication/ICommunicationSpi.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Communication/ICommunicationSpi.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/ICommunicationSpi.cs new file mode 100644 index 0000000..bd3f51d --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/ICommunicationSpi.cs @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Communication +{ + using System.Diagnostics.CodeAnalysis; + using Apache.Ignite.Core.Communication.Tcp; + + /// <summary> + /// Communication SPI is responsible for data exchange between nodes. + /// <para /> + /// Communication SPI is one of the most important SPIs in Ignite. It is used + /// heavily throughout the system and provides means for all data exchanges + /// between nodes, such as internal implementation details and user driven messages. + /// <para /> + /// Only predefined implementation is supported now: <see cref="TcpCommunicationSpi"/>. + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1040:AvoidEmptyInterfaces")] + public interface ICommunicationSpi + { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/TcpCommunicationSpi.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/TcpCommunicationSpi.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/TcpCommunicationSpi.cs new file mode 100644 index 0000000..afd3b57 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/TcpCommunicationSpi.cs @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Communication.Tcp +{ + using System; + using System.ComponentModel; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Impl.Binary; + + /// <summary> + /// <see cref="TcpCommunicationSpi"/> is default communication SPI which uses + /// TCP/IP protocol and Java NIO to communicate with other nodes. + /// <para /> + /// At startup, this SPI tries to start listening to local port specified by + /// <see cref="LocalPort"/> property. If local port is occupied, then SPI will + /// automatically increment the port number until it can successfully bind for + /// listening. <see cref="LocalPortRange"/> configuration parameter controls + /// maximum number of ports that SPI will try before it fails. Port range comes + /// very handy when starting multiple grid nodes on the same machine or even + /// in the same VM. In this case all nodes can be brought up without a single + /// change in configuration. + /// </summary> + public class TcpCommunicationSpi : ICommunicationSpi + { + /// <summary> Default value of <see cref="AckSendThreshold"/> property. </summary> + public const int DefaultAckSendThreshold = 16; + + /// <summary> Default value of <see cref="ConnectTimeout"/> property. </summary> + public static readonly TimeSpan DefaultConnectTimeout = TimeSpan.FromSeconds(5); + + /// <summary> Default value of <see cref="DirectBuffer"/> property. </summary> + public const bool DefaultDirectBuffer = true; + + /// <summary> Default value of <see cref="DirectSendBuffer"/> property. </summary> + public const bool DefaultDirectSendBuffer = false; + + /// <summary> Default value of <see cref="IdleConnectionTimeout"/> property. </summary> + public static readonly TimeSpan DefaultIdleConnectionTimeout = TimeSpan.FromSeconds(30); + + /// <summary> Default value of <see cref="LocalPort"/> property. </summary> + public const int DefaultLocalPort = 47100; + + /// <summary> Default value of <see cref="LocalPortRange"/> property. </summary> + public const int DefaultLocalPortRange = 100; + + /// <summary> Default value of <see cref="MaxConnectTimeout"/> property. </summary> + public static readonly TimeSpan DefaultMaxConnectTimeout = TimeSpan.FromMinutes(10); + + /// <summary> Default value of <see cref="MessageQueueLimit"/> property. </summary> + public const int DefaultMessageQueueLimit = 1024; + + /// <summary> Default value of <see cref="ReconnectCount"/> property. </summary> + public const int DefaultReconnectCount = 10; + + /// <summary> Default value of <see cref="SelectorsCount"/> property. </summary> + public static readonly int DefaultSelectorsCount = Math.Min(4, Environment.ProcessorCount); + + /// <summary> Default socket buffer size. </summary> + public const int DefaultSocketBufferSize = 32 * 1024; + + /// <summary> Default value of <see cref="TcpNoDelay"/> property. </summary> + public const bool DefaultTcpNoDelay = true; + + /// <summary> + /// Initializes a new instance of the <see cref="TcpCommunicationSpi"/> class. + /// </summary> + public TcpCommunicationSpi() + { + AckSendThreshold = DefaultAckSendThreshold; + ConnectTimeout = DefaultConnectTimeout; + DirectBuffer = DefaultDirectBuffer; + DirectSendBuffer = DefaultDirectSendBuffer; + IdleConnectionTimeout = DefaultIdleConnectionTimeout; + LocalPort = DefaultLocalPort; + LocalPortRange = DefaultLocalPortRange; + MaxConnectTimeout = DefaultMaxConnectTimeout; + MessageQueueLimit = DefaultMessageQueueLimit; + ReconnectCount = DefaultReconnectCount; + SelectorsCount = DefaultSelectorsCount; + SocketReceiveBufferSize = DefaultSocketBufferSize; + SocketSendBufferSize = DefaultSocketBufferSize; + TcpNoDelay = DefaultTcpNoDelay; + } + + /// <summary> + /// Initializes a new instance of the <see cref="TcpCommunicationSpi"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + internal TcpCommunicationSpi(BinaryReader reader) + { + AckSendThreshold = reader.ReadInt(); + ConnectTimeout = reader.ReadLongAsTimespan(); + DirectBuffer = reader.ReadBoolean(); + DirectSendBuffer = reader.ReadBoolean(); + IdleConnectionTimeout = reader.ReadLongAsTimespan(); + LocalAddress = reader.ReadString(); + LocalPort = reader.ReadInt(); + LocalPortRange = reader.ReadInt(); + MaxConnectTimeout = reader.ReadLongAsTimespan(); + MessageQueueLimit = reader.ReadInt(); + ReconnectCount = reader.ReadInt(); + SelectorsCount = reader.ReadInt(); + SlowClientQueueLimit = reader.ReadInt(); + SocketReceiveBufferSize = reader.ReadInt(); + SocketSendBufferSize = reader.ReadInt(); + TcpNoDelay = reader.ReadBoolean(); + UnacknowledgedMessagesBufferSize = reader.ReadInt(); + } + + /// <summary> + /// Gets or sets the number of received messages per connection to node + /// after which acknowledgment message is sent. + /// </summary> + [DefaultValue(DefaultAckSendThreshold)] + public int AckSendThreshold { get; set; } + + /// <summary> + /// Gets or sets the connect timeout used when establishing connection with remote nodes. + /// </summary> + [DefaultValue(typeof(TimeSpan), "00:00:05")] + public TimeSpan ConnectTimeout { get; set; } + + /// <summary> + /// Gets or sets a value indicating whether to allocate direct (ByteBuffer.allocateDirect) + /// or heap (ByteBuffer.allocate) buffer. + /// </summary> + [DefaultValue(DefaultDirectBuffer)] + public bool DirectBuffer { get; set; } + + /// <summary> + /// Gets or sets a value indicating whether to allocate direct (ByteBuffer.allocateDirect) + /// or heap (ByteBuffer.allocate) send buffer. + /// </summary> + [DefaultValue(DefaultDirectSendBuffer)] + public bool DirectSendBuffer { get; set; } + + /// <summary> + /// Sets maximum idle connection timeout upon which a connection to client will be closed. + /// </summary> + [DefaultValue(typeof(TimeSpan), "00:00:30")] + public TimeSpan IdleConnectionTimeout { get; set; } + + /// <summary> + /// Gets or sets the local host address for socket binding. Note that one node could have + /// additional addresses beside the loopback one. This configuration parameter is optional. + /// </summary> + public string LocalAddress { get; set; } + + /// <summary> + /// Gets or sets the local port for socket binding. + /// </summary> + [DefaultValue(DefaultLocalPort)] + public int LocalPort { get; set; } + + /// <summary> + /// Gets or sets local port range for local host ports (value must greater than or equal to <tt>0</tt>). + /// If provided local port <see cref="LocalPort"/> is occupied, + /// implementation will try to increment the port number for as long as it is less than + /// initial value plus this range. + /// <para /> + /// If port range value is <c>0</c>, then implementation will try bind only to the port provided by + /// <see cref="LocalPort"/> method and fail if binding to this port did not succeed. + /// </summary> + [DefaultValue(DefaultLocalPortRange)] + public int LocalPortRange { get; set; } + + /// <summary> + /// Gets or sets maximum connect timeout. If handshake is not established within connect timeout, + /// then SPI tries to repeat handshake procedure with increased connect timeout. + /// Connect timeout can grow till maximum timeout value, + /// if maximum timeout value is reached then the handshake is considered as failed. + /// <para /> + /// <c>0</c> is interpreted as infinite timeout. + /// </summary> + [DefaultValue(typeof(TimeSpan), "00:10:00")] + public TimeSpan MaxConnectTimeout { get; set; } + + /// <summary> + /// Gets or sets the message queue limit for incoming and outgoing messages. + /// <para /> + /// When set to positive number send queue is limited to the configured value. + /// <c>0</c> disables the limitation. + /// </summary> + [DefaultValue(DefaultMessageQueueLimit)] + public int MessageQueueLimit { get; set; } + + /// <summary> + /// Gets or sets the maximum number of reconnect attempts used when establishing connection with remote nodes. + /// </summary> + [DefaultValue(DefaultReconnectCount)] + public int ReconnectCount { get; set; } + + /// <summary> + /// Gets or sets the count of selectors te be used in TCP server. + /// <para /> + /// Default value is <see cref="DefaultSelectorsCount"/>, which is calculated as + /// <c>Math.Min(4, Environment.ProcessorCount)</c> + /// </summary> + public int SelectorsCount { get; set; } + + /// <summary> + /// Gets or sets slow client queue limit. + /// <para/> + /// When set to a positive number, communication SPI will monitor clients outbound message queue sizes + /// and will drop those clients whose queue exceeded this limit. + /// <para/> + /// Usually this value should be set to the same value as <see cref="MessageQueueLimit"/> which controls + /// message back-pressure for server nodes. The default value for this parameter is <c>0</c> + /// which means unlimited. + /// </summary> + public int SlowClientQueueLimit { get; set; } + + /// <summary> + /// Gets or sets the size of the socket receive buffer. + /// </summary> + [DefaultValue(DefaultSocketBufferSize)] + public int SocketReceiveBufferSize { get; set; } + + /// <summary> + /// Gets or sets the size of the socket send buffer. + /// </summary> + [DefaultValue(DefaultSocketBufferSize)] + public int SocketSendBufferSize { get; set; } + + /// <summary> + /// Gets or sets the value for <c>TCP_NODELAY</c> socket option. Each + /// socket will be opened using provided value. + /// <para /> + /// Setting this option to <c>true</c> disables Nagle's algorithm + /// for socket decreasing latency and delivery time for small messages. + /// <para /> + /// For systems that work under heavy network load it is advisable to set this value to <c>false</c>. + /// </summary> + [DefaultValue(DefaultTcpNoDelay)] + public bool TcpNoDelay { get; set; } + + /// <summary> + /// Gets or sets the maximum number of stored unacknowledged messages per connection to node. + /// If number of unacknowledged messages exceeds this number + /// then connection to node is closed and reconnect is attempted. + /// </summary> + public int UnacknowledgedMessagesBufferSize { get; set; } + + /// <summary> + /// Writes this instance to the specified writer. + /// </summary> + internal void Write(IBinaryRawWriter writer) + { + writer.WriteInt(AckSendThreshold); + writer.WriteLong((long) ConnectTimeout.TotalMilliseconds); + writer.WriteBoolean(DirectBuffer); + writer.WriteBoolean(DirectSendBuffer); + writer.WriteLong((long) IdleConnectionTimeout.TotalMilliseconds); + writer.WriteString(LocalAddress); + writer.WriteInt(LocalPort); + writer.WriteInt(LocalPortRange); + writer.WriteLong((long) MaxConnectTimeout.TotalMilliseconds); + writer.WriteInt(MessageQueueLimit); + writer.WriteInt(ReconnectCount); + writer.WriteInt(SelectorsCount); + writer.WriteInt(SlowClientQueueLimit); + writer.WriteInt(SocketReceiveBufferSize); + writer.WriteInt(SocketSendBufferSize); + writer.WriteBoolean(TcpNoDelay); + writer.WriteInt(UnacknowledgedMessagesBufferSize); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs index e14c15b..62cad19 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs @@ -29,6 +29,8 @@ using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Configuration; using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Communication; + using Apache.Ignite.Core.Communication.Tcp; using Apache.Ignite.Core.DataStructures.Configuration; using Apache.Ignite.Core.Discovery; using Apache.Ignite.Core.Discovery.Tcp; @@ -220,6 +222,23 @@ else writer.WriteBoolean(false); + // Communication config + var comm = CommunicationSpi; + + if (comm != null) + { + writer.WriteBoolean(true); + + var tcpComm = comm as TcpCommunicationSpi; + + if (tcpComm == null) + throw new InvalidOperationException("Unsupported communication SPI: " + comm.GetType()); + + tcpComm.Write(writer); + } + else + writer.WriteBoolean(false); + // Binary config var isCompactFooterSet = BinaryConfiguration != null && BinaryConfiguration.CompactFooterInternal != null; @@ -302,6 +321,9 @@ // Discovery config DiscoverySpi = r.ReadBoolean() ? new TcpDiscoverySpi(r) : null; + // Communication config + CommunicationSpi = r.ReadBoolean() ? new TcpCommunicationSpi(r) : null; + // Binary config if (r.ReadBoolean()) { @@ -478,6 +500,12 @@ public IDiscoverySpi DiscoverySpi { get; set; } /// <summary> + /// Gets or sets the communication service provider. + /// Null for default communication. + /// </summary> + public ICommunicationSpi CommunicationSpi { get; set; } + + /// <summary> /// Gets or sets a value indicating whether node should start in client mode. /// Client node cannot hold data in the caches. /// </summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd index a0df870..29074e7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd @@ -262,6 +262,28 @@ <xs:attribute name="type" type="xs:string" use="required" /> </xs:complexType> </xs:element> + <xs:element name="communicationSpi" minOccurs="0"> + <xs:complexType> + <xs:attribute name="ackSendThreshold" type="xs:int" /> + <xs:attribute name="connectTimeout" type="xs:string" /> + <xs:attribute name="directBuffer" type="xs:boolean" /> + <xs:attribute name="directSendBuffer" type="xs:boolean" /> + <xs:attribute name="idleConnectionTimeout" type="xs:string" /> + <xs:attribute name="localAddress" type="xs:string" /> + <xs:attribute name="localPort" type="xs:int" /> + <xs:attribute name="localPortRange" type="xs:int" /> + <xs:attribute name="maxConnectTimeout" type="xs:string" /> + <xs:attribute name="messageQueueLimit" type="xs:string" /> + <xs:attribute name="reconnectCount" type="xs:int" /> + <xs:attribute name="selectorsCount" type="xs:int" /> + <xs:attribute name="slowClientQueueLimit" type="xs:int" /> + <xs:attribute name="socketReceiveBufferSize" type="xs:int" /> + <xs:attribute name="socketSendBufferSize" type="xs:int" /> + <xs:attribute name="tcpNoDelay" type="xs:boolean" /> + <xs:attribute name="unacknowledgedMessagesBufferSize" type="xs:int" /> + <xs:attribute name="type" type="xs:string" use="required" /> + </xs:complexType> + </xs:element> <xs:element name="includedEventTypes" minOccurs="0"> <xs:complexType> <xs:sequence>
