Repository: ignite
Updated Branches:
  refs/heads/master c9106fb0a -> 629327067


IGNITE-8447: .Net support of DataStreamer#perThreadBufferSize - Fixes #3952.

Signed-off-by: Nikolay Izhikov <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/62932706
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/62932706
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/62932706

Branch: refs/heads/master
Commit: 629327067197f4f1577b5f24e81c821474acb0ab
Parents: c9106fb
Author: Nikolay Izhikov <[email protected]>
Authored: Tue May 8 10:03:14 2018 +0300
Committer: Nikolay Izhikov <[email protected]>
Committed: Tue May 8 10:03:14 2018 +0300

----------------------------------------------------------------------
 .../datastreamer/PlatformDataStreamer.java      | 14 +++++++
 .../ApiParity/StreamerParityTest.cs             |  5 +--
 .../Dataload/DataStreamerTest.cs                |  7 ++++
 .../Datastream/DataStreamerDefaults.cs          |  5 +++
 .../Datastream/IDataStreamer.cs                 | 17 ++++++++
 .../Impl/Datastream/DataStreamerImpl.cs         | 41 ++++++++++++++++++++
 6 files changed, 86 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index 8cd14c7..9f57ce0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -92,6 +92,12 @@ public class PlatformDataStreamer extends 
PlatformAbstractTarget {
     /** */
     private static final int OP_SET_TIMEOUT = 13;
 
+    /** */
+    private static final int OP_PER_THREAD_BUFFER_SIZE = 14;
+
+    /** */
+    private static final int OP_SET_PER_THREAD_BUFFER_SIZE = 15;
+
     /** Cache name. */
     private final String cacheName;
 
@@ -186,6 +192,11 @@ public class PlatformDataStreamer extends 
PlatformAbstractTarget {
                 ldr.perNodeBufferSize((int) val);
 
                 return TRUE;
+            
+            case OP_SET_PER_THREAD_BUFFER_SIZE:
+                ldr.perThreadBufferSize((int) val);
+
+                return TRUE;
 
             case OP_SET_SKIP_STORE:
                 ldr.skipStore(val == TRUE);
@@ -230,6 +241,9 @@ public class PlatformDataStreamer extends 
PlatformAbstractTarget {
 
             case OP_PER_NODE_BUFFER_SIZE:
                 return ldr.perNodeBufferSize();
+            
+            case OP_PER_THREAD_BUFFER_SIZE:
+                return ldr.perThreadBufferSize();
 
             case OP_SKIP_STORE:
                 return ldr.skipStore() ? TRUE : FALSE;

http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs
----------------------------------------------------------------------
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs
index 9af0561..3cd58e8 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/StreamerParityTest.cs
@@ -29,8 +29,7 @@ namespace Apache.Ignite.Core.Tests.ApiParity
         /** Members that are not needed on .NET side. */
         private static readonly string[] UnneededMembers =
         {
-            "deployClass",
-            "perThreadBufferSize"
+            "deployClass"
         };
 
         /** Known name mappings. */
@@ -52,4 +51,4 @@ namespace Apache.Ignite.Core.Tests.ApiParity
                 UnneededMembers, knownMappings: KnownMappings);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
----------------------------------------------------------------------
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
index 60a1067..a3c804d 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
@@ -115,6 +115,12 @@ namespace Apache.Ignite.Core.Tests.Dataload
                 Assert.AreEqual(1, ldr.PerNodeBufferSize);
                 ldr.PerNodeBufferSize = 2;
                 Assert.AreEqual(2, ldr.PerNodeBufferSize);
+                
+                
Assert.AreEqual(DataStreamerDefaults.DefaultPerThreadBufferSize, 
ldr.PerThreadBufferSize);
+                ldr.PerThreadBufferSize = 1;
+                Assert.AreEqual(1, ldr.PerThreadBufferSize);
+                ldr.PerThreadBufferSize = 2;
+                Assert.AreEqual(2, ldr.PerThreadBufferSize);
 
                 Assert.AreEqual(0, ldr.PerNodeParallelOperations);
                 var ops = 
DataStreamerDefaults.DefaultParallelOperationsMultiplier *
@@ -271,6 +277,7 @@ namespace Apache.Ignite.Core.Tests.Dataload
                 Assert.IsFalse(task.IsCompleted);
 
                 ldr.PerNodeBufferSize = 2;
+                ldr.PerThreadBufferSize = 1;
 
                 ldr.AddData(part2[0], part2[0]);
                 ldr.AddData(part1[1], part1[1]);

http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs
----------------------------------------------------------------------
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs
index 315ae7f..aa0d114 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs
@@ -28,6 +28,11 @@ namespace Apache.Ignite.Core.Datastream
         /// The default per node buffer size, see <see 
cref="IDataStreamer{TK,TV}.PerNodeBufferSize"/>.
         /// </summary>
         public const int DefaultPerNodeBufferSize = 512;
+        
+        /// <summary>
+        /// The default per thread buffer size, see <see 
cref="IDataStreamer{TK,TV}.PerThreadBufferSize"/>.
+        /// </summary>
+        public const int DefaultPerThreadBufferSize = 4096;
 
         /// <summary>
         /// Default multiplier for parallel operations per node:

http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
----------------------------------------------------------------------
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
index 277130c..9ba0193 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs
@@ -52,6 +52,13 @@ namespace Apache.Ignite.Core.Datastream
     ///         remote node. Default value is 1024.</description>
     ///     </item>
     ///     <item>
+    ///         <term>PerThreadBufferSize</term>
+    ///         <description>When entries are added to data streamer they are 
not sent to Ignite 
+    ///         right away and are buffered internally on per thread basis for 
better performance and network utilization. 
+    ///         This setting controls the size of internal per-thread buffer 
before buffered data is sent to 
+    ///         remote node. Default value is 4096.</description>
+    ///     </item>
+    ///     <item>
     ///         <term>PerNodeParallelOperations</term>
     ///         <description>Sometimes data may be added to the data streamer 
faster than it can be put 
     ///         in cache. In this case, new buffered load messages are sent to 
remote nodes before 
@@ -117,6 +124,16 @@ namespace Apache.Ignite.Core.Datastream
         int PerNodeBufferSize { get; set; }
 
         /// <summary>
+        /// Size of per thread key-value pairs buffer.
+        /// <para />
+        /// Setter must be called before any add/remove operation.
+        /// <para />
+        /// Default is <see 
cref="DataStreamerDefaults.DefaultPerThreadBufferSize"/>.
+        /// </summary>
+        [DefaultValue(DataStreamerDefaults.DefaultPerThreadBufferSize)]
+        int PerThreadBufferSize { get; set; }
+
+        /// <summary>
         /// Maximum number of parallel load operations for a single node.
         /// <para />
         /// Setter must be called before any add/remove operation.

http://git-wip-us.apache.org/repos/asf/ignite/blob/62932706/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
----------------------------------------------------------------------
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
index 7aaa84a..755d1f1 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
@@ -99,6 +99,12 @@ namespace Apache.Ignite.Core.Impl.Datastream
         /** */
         private const int OpSetTimeout = 13;
 
+        /** */
+        private const int OpPerThreadBufferSize = 14;
+
+        /** */
+        private const int OpSetPerThreadBufferSize = 15;
+
         /** Cache name. */
         private readonly string _cacheName;
 
@@ -283,6 +289,41 @@ namespace Apache.Ignite.Core.Impl.Datastream
                 }
             }
         }
+        
+        /** <inheritDoc /> */
+        public int PerThreadBufferSize
+        {
+            get
+            {
+                _rwLock.EnterReadLock(); 
+                
+                try
+                {
+                    ThrowIfDisposed();
+
+                    return (int) DoOutInOp(OpPerThreadBufferSize);
+                }
+                finally
+                {
+                    _rwLock.ExitReadLock();
+                }
+            }
+            set
+            {
+                _rwLock.EnterWriteLock(); 
+                
+                try
+                {
+                    ThrowIfDisposed();
+
+                    DoOutInOp(OpSetPerThreadBufferSize, value);
+                }
+                finally
+                {
+                    _rwLock.ExitWriteLock();
+                }
+            }
+        }
 
         /** <inheritDoc /> */
         public int PerNodeParallelOperations

Reply via email to