Repository: ignite Updated Branches: refs/heads/master c9106fb0a -> 629327067
IGNITE-8447: .Net support of DataStreamer#perThreadBufferSize - Fixes #3952. Signed-off-by: Nikolay Izhikov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/62932706 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/62932706 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/62932706 Branch: refs/heads/master Commit: 629327067197f4f1577b5f24e81c821474acb0ab Parents: c9106fb Author: Nikolay Izhikov <[email protected]> Authored: Tue May 8 10:03:14 2018 +0300 Committer: Nikolay Izhikov <[email protected]> Committed: Tue May 8 10:03:14 2018 +0300 ---------------------------------------------------------------------- .../datastreamer/PlatformDataStreamer.java | 14 +++++++ .../ApiParity/StreamerParityTest.cs | 5 +-- .../Dataload/DataStreamerTest.cs | 7 ++++ .../Datastream/DataStreamerDefaults.cs | 5 +++ .../Datastream/IDataStreamer.cs | 17 ++++++++ .../Impl/Datastream/DataStreamerImpl.cs | 41 ++++++++++++++++++++ 6 files changed, 86 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java index 8cd14c7..9f57ce0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java @@ -92,6 +92,12 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { /** */ private static final int OP_SET_TIMEOUT = 13; + /** */ + private static final int OP_PER_THREAD_BUFFER_SIZE = 14; + + /** */ + private static final int OP_SET_PER_THREAD_BUFFER_SIZE = 15; + /** Cache name. */ private final String cacheName; @@ -186,6 +192,11 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { ldr.perNodeBufferSize((int) val); return TRUE; + + case OP_SET_PER_THREAD_BUFFER_SIZE: + ldr.perThreadBufferSize((int) val); + + return TRUE; case OP_SET_SKIP_STORE: ldr.skipStore(val == TRUE); @@ -230,6 +241,9 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { case OP_PER_NODE_BUFFER_SIZE: return ldr.perNodeBufferSize(); + + case OP_PER_THREAD_BUFFER_SIZE: + return ldr.perThreadBufferSize(); case OP_SKIP_STORE: return ldr.skipStore() ? TRUE : FALSE; http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs index 9af0561..3cd58e8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs @@ -29,8 +29,7 @@ namespace Apache.Ignite.Core.Tests.ApiParity /** Members that are not needed on .NET side. */ private static readonly string[] UnneededMembers = { - "deployClass", - "perThreadBufferSize" + "deployClass" }; /** Known name mappings. */ @@ -52,4 +51,4 @@ namespace Apache.Ignite.Core.Tests.ApiParity UnneededMembers, knownMappings: KnownMappings); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs index 60a1067..a3c804d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs @@ -115,6 +115,12 @@ namespace Apache.Ignite.Core.Tests.Dataload Assert.AreEqual(1, ldr.PerNodeBufferSize); ldr.PerNodeBufferSize = 2; Assert.AreEqual(2, ldr.PerNodeBufferSize); + + Assert.AreEqual(DataStreamerDefaults.DefaultPerThreadBufferSize, ldr.PerThreadBufferSize); + ldr.PerThreadBufferSize = 1; + Assert.AreEqual(1, ldr.PerThreadBufferSize); + ldr.PerThreadBufferSize = 2; + Assert.AreEqual(2, ldr.PerThreadBufferSize); Assert.AreEqual(0, ldr.PerNodeParallelOperations); var ops = DataStreamerDefaults.DefaultParallelOperationsMultiplier * @@ -271,6 +277,7 @@ namespace Apache.Ignite.Core.Tests.Dataload Assert.IsFalse(task.IsCompleted); ldr.PerNodeBufferSize = 2; + ldr.PerThreadBufferSize = 1; ldr.AddData(part2[0], part2[0]); ldr.AddData(part1[1], part1[1]); http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs index 315ae7f..aa0d114 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs @@ -28,6 +28,11 @@ namespace Apache.Ignite.Core.Datastream /// The default per node buffer size, see <see cref="IDataStreamer{TK,TV}.PerNodeBufferSize"/>. /// </summary> public const int DefaultPerNodeBufferSize = 512; + + /// <summary> + /// The default per thread buffer size, see <see cref="IDataStreamer{TK,TV}.PerThreadBufferSize"/>. + /// </summary> + public const int DefaultPerThreadBufferSize = 4096; /// <summary> /// Default multiplier for parallel operations per node: http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs index 277130c..9ba0193 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs @@ -52,6 +52,13 @@ namespace Apache.Ignite.Core.Datastream /// remote node. Default value is 1024.</description> /// </item> /// <item> + /// <term>PerThreadBufferSize</term> + /// <description>When entries are added to data streamer they are not sent to Ignite + /// right away and are buffered internally on per thread basis for better performance and network utilization. + /// This setting controls the size of internal per-thread buffer before buffered data is sent to + /// remote node. Default value is 4096.</description> + /// </item> + /// <item> /// <term>PerNodeParallelOperations</term> /// <description>Sometimes data may be added to the data streamer faster than it can be put /// in cache. In this case, new buffered load messages are sent to remote nodes before @@ -117,6 +124,16 @@ namespace Apache.Ignite.Core.Datastream int PerNodeBufferSize { get; set; } /// <summary> + /// Size of per thread key-value pairs buffer. + /// <para /> + /// Setter must be called before any add/remove operation. + /// <para /> + /// Default is <see cref="DataStreamerDefaults.DefaultPerThreadBufferSize"/>. + /// </summary> + [DefaultValue(DataStreamerDefaults.DefaultPerThreadBufferSize)] + int PerThreadBufferSize { get; set; } + + /// <summary> /// Maximum number of parallel load operations for a single node. /// <para /> /// Setter must be called before any add/remove operation. http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs index 7aaa84a..755d1f1 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs @@ -99,6 +99,12 @@ namespace Apache.Ignite.Core.Impl.Datastream /** */ private const int OpSetTimeout = 13; + /** */ + private const int OpPerThreadBufferSize = 14; + + /** */ + private const int OpSetPerThreadBufferSize = 15; + /** Cache name. */ private readonly string _cacheName; @@ -283,6 +289,41 @@ namespace Apache.Ignite.Core.Impl.Datastream } } } + + /** <inheritDoc /> */ + public int PerThreadBufferSize + { + get + { + _rwLock.EnterReadLock(); + + try + { + ThrowIfDisposed(); + + return (int) DoOutInOp(OpPerThreadBufferSize); + } + finally + { + _rwLock.ExitReadLock(); + } + } + set + { + _rwLock.EnterWriteLock(); + + try + { + ThrowIfDisposed(); + + DoOutInOp(OpSetPerThreadBufferSize, value); + } + finally + { + _rwLock.ExitWriteLock(); + } + } + } /** <inheritDoc /> */ public int PerNodeParallelOperations
