Repository: ignite Updated Branches: refs/heads/master 5ca7909fd -> 5764960e8
IGNITE-6517 .NET: DataStreamer DefaultPerNodeBufferSize, DefaultParallelOpsMultiplier, Timeout This closes #2785 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5764960e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5764960e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5764960e Branch: refs/heads/master Commit: 5764960e802e91b87956f4515e289eaf0003a2de Parents: 5ca7909 Author: Pavel Tupitsyn <[email protected]> Authored: Mon Oct 2 16:48:23 2017 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Mon Oct 2 16:48:23 2017 +0300 ---------------------------------------------------------------------- .../datastreamer/PlatformDataStreamer.java | 14 ++++++ .../Dataload/DataStreamerTest.cs | 50 +++++++++++++++++--- .../Apache.Ignite.Core.csproj | 1 + .../Datastream/DataStreamerDefaults.cs | 46 ++++++++++++++++++ .../Datastream/IDataStreamer.cs | 21 +++++++- .../Impl/Binary/BinaryReaderExtensions.cs | 10 +--- .../Impl/Binary/BinaryUtils.cs | 14 ++++++ .../Impl/Datastream/DataStreamerImpl.cs | 43 ++++++++++++++++- 8 files changed, 179 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java index fba0a4c..8cd14c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java @@ -86,6 +86,12 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { /** */ private static final int OP_LISTEN_TOPOLOGY = 11; + /** */ + private static final int OP_GET_TIMEOUT = 12; + + /** */ + private static final int OP_SET_TIMEOUT = 13; + /** Cache name. */ private final String cacheName; @@ -230,6 +236,14 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { case OP_PER_NODE_PARALLEL_OPS: return ldr.perNodeParallelOperations(); + + case OP_GET_TIMEOUT: + return ldr.timeout(); + + case OP_SET_TIMEOUT: + ldr.timeout(val); + + return TRUE; } return super.processInLongOutLong(type, val); http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs index fe5955f..60a1067 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs @@ -95,25 +95,40 @@ namespace Apache.Ignite.Core.Tests.Dataload { using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName)) { + Assert.AreEqual(CacheName, ldr.CacheName); + Assert.AreEqual(0, ldr.AutoFlushFrequency); + + Assert.IsFalse(ldr.AllowOverwrite); ldr.AllowOverwrite = true; Assert.IsTrue(ldr.AllowOverwrite); ldr.AllowOverwrite = false; Assert.IsFalse(ldr.AllowOverwrite); + Assert.IsFalse(ldr.SkipStore); ldr.SkipStore = true; Assert.IsTrue(ldr.SkipStore); ldr.SkipStore = false; Assert.IsFalse(ldr.SkipStore); + Assert.AreEqual(DataStreamerDefaults.DefaultPerNodeBufferSize, ldr.PerNodeBufferSize); ldr.PerNodeBufferSize = 1; Assert.AreEqual(1, ldr.PerNodeBufferSize); ldr.PerNodeBufferSize = 2; Assert.AreEqual(2, ldr.PerNodeBufferSize); - ldr.PerNodeParallelOperations = 1; - Assert.AreEqual(1, ldr.PerNodeParallelOperations); + Assert.AreEqual(0, ldr.PerNodeParallelOperations); + var ops = DataStreamerDefaults.DefaultParallelOperationsMultiplier * + IgniteConfiguration.DefaultThreadPoolSize; + ldr.PerNodeParallelOperations = ops; + Assert.AreEqual(ops, ldr.PerNodeParallelOperations); ldr.PerNodeParallelOperations = 2; Assert.AreEqual(2, ldr.PerNodeParallelOperations); + + Assert.AreEqual(DataStreamerDefaults.DefaultTimeout, ldr.Timeout); + ldr.Timeout = TimeSpan.MaxValue; + Assert.AreEqual(TimeSpan.MaxValue, ldr.Timeout); + ldr.Timeout = TimeSpan.FromSeconds(1.5); + Assert.AreEqual(1.5, ldr.Timeout.TotalSeconds); } } @@ -123,28 +138,37 @@ namespace Apache.Ignite.Core.Tests.Dataload [Test] public void TestAddRemove() { - using (IDataStreamer<int, int> ldr = _grid.GetDataStreamer<int, int>(CacheName)) + IDataStreamer<int, int> ldr; + + using (ldr = _grid.GetDataStreamer<int, int>(CacheName)) { + Assert.IsFalse(ldr.Task.IsCompleted); + ldr.AllowOverwrite = true; // Additions. - ldr.AddData(1, 1); + var task = ldr.AddData(1, 1); ldr.Flush(); Assert.AreEqual(1, _cache.Get(1)); + Assert.IsTrue(task.IsCompleted); + Assert.IsFalse(ldr.Task.IsCompleted); - ldr.AddData(new KeyValuePair<int, int>(2, 2)); + task = ldr.AddData(new KeyValuePair<int, int>(2, 2)); ldr.Flush(); Assert.AreEqual(2, _cache.Get(2)); + Assert.IsTrue(task.IsCompleted); - ldr.AddData(new List<KeyValuePair<int, int>> { new KeyValuePair<int, int>(3, 3), new KeyValuePair<int, int>(4, 4) }); + task = ldr.AddData(new [] { new KeyValuePair<int, int>(3, 3), new KeyValuePair<int, int>(4, 4) }); ldr.Flush(); Assert.AreEqual(3, _cache.Get(3)); Assert.AreEqual(4, _cache.Get(4)); + Assert.IsTrue(task.IsCompleted); // Removal. - ldr.RemoveData(1); + task = ldr.RemoveData(1); ldr.Flush(); Assert.IsFalse(_cache.ContainsKey(1)); + Assert.IsTrue(task.IsCompleted); // Mixed. ldr.AddData(5, 5); @@ -165,6 +189,8 @@ namespace Apache.Ignite.Core.Tests.Dataload for (int i = 5; i < 13; i++) Assert.AreEqual(i, _cache.Get(i)); } + + Assert.IsTrue(ldr.Task.IsCompleted); } /// <summary> @@ -517,6 +543,16 @@ namespace Apache.Ignite.Core.Tests.Dataload for (var i = 0; i < 100; i++) Assert.AreEqual(i + 1, cache.Get(i).Val); + + // Repeating WithKeepBinary call: valid args. + Assert.AreSame(ldr, ldr.WithKeepBinary<int, IBinaryObject>()); + + // Invalid type args. + var ex = Assert.Throws<InvalidOperationException>(() => ldr.WithKeepBinary<string, IBinaryObject>()); + + Assert.AreEqual( + "Can't change type of binary streamer. WithKeepBinary has been called on an instance of " + + "binary streamer with incompatible generic arguments.", ex.Message); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 67c540c..58abd26 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -104,6 +104,7 @@ <Compile Include="Common\ExceptionFactory.cs" /> <Compile Include="Configuration\Package-Info.cs" /> <Compile Include="Configuration\ClientConnectorConfiguration.cs" /> + <Compile Include="Datastream\DataStreamerDefaults.cs" /> <Compile Include="Impl\Binary\BinaryTypeId.cs" /> <Compile Include="Impl\Client\Cache\CacheFlags.cs" /> <Compile Include="Impl\Client\Cache\Query\ClientQueryCursor.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs new file mode 100644 index 0000000..315ae7f --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Datastream +{ + using System; + + /// <summary> + /// Data streamer configuration defaults. + /// </summary> + public static class DataStreamerDefaults + { + /// <summary> + /// The default per node buffer size, see <see cref="IDataStreamer{TK,TV}.PerNodeBufferSize"/>. + /// </summary> + public const int DefaultPerNodeBufferSize = 512; + + /// <summary> + /// Default multiplier for parallel operations per node: + /// <see cref="IDataStreamer{TK,TV}.PerNodeParallelOperations"/> = + /// <see cref="IgniteConfiguration.DataStreamerThreadPoolSize"/> * + /// <see cref="DefaultParallelOperationsMultiplier"/>. + /// </summary> + public const int DefaultParallelOperationsMultiplier = 8; + + /// <summary> + /// The default timeout (see <see cref="IDataStreamer{TK,TV}.Timeout"/>). + /// Negative value means no timeout. + /// </summary> + public static readonly TimeSpan DefaultTimeout = TimeSpan.FromMilliseconds(-1); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs index 222f6c3..277130c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs @@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Datastream { using System; using System.Collections.Generic; + using System.ComponentModel; using System.Threading.Tasks; using Apache.Ignite.Core.Cache.Store; @@ -110,8 +111,9 @@ namespace Apache.Ignite.Core.Datastream /// <para /> /// Setter must be called before any add/remove operation. /// <para /> - /// Default is <c>1024</c>. + /// Default is <see cref="DataStreamerDefaults.DefaultPerNodeBufferSize"/>. /// </summary> + [DefaultValue(DataStreamerDefaults.DefaultPerNodeBufferSize)] int PerNodeBufferSize { get; set; } /// <summary> @@ -119,7 +121,9 @@ namespace Apache.Ignite.Core.Datastream /// <para /> /// Setter must be called before any add/remove operation. /// <para /> - /// Default is <c>16</c>. + /// Default is 0, which means Ignite calculates this automatically as + /// <see cref="IgniteConfiguration.DataStreamerThreadPoolSize"/> * + /// <see cref="DataStreamerDefaults.DefaultParallelOperationsMultiplier"/>. /// </summary> int PerNodeParallelOperations { get; set; } @@ -208,5 +212,18 @@ namespace Apache.Ignite.Core.Datastream /// <typeparam name="TV1">Value type in binary mode.</typeparam> /// <returns>Streamer instance with binary mode enabled.</returns> IDataStreamer<TK1, TV1> WithKeepBinary<TK1, TV1>(); + + /// <summary> + /// Gets or sets the timeout. Negative values mean no timeout. + /// Default is <see cref="DataStreamerDefaults.DefaultTimeout"/>. + /// <para /> + /// Timeout is used in the following cases: + /// <li>Any data addition method can be blocked when all per node parallel operations are exhausted. + /// The timeout defines the max time you will be blocked waiting for a permit to add a chunk of data + /// into the streamer;</li> + /// <li>Total timeout time for <see cref="Flush"/> operation;</li> + /// <li>Total timeout time for <see cref="Close"/> operation.</li> + /// </summary> + TimeSpan Timeout { get; set; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs index 7556c41..da87d21 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs @@ -58,15 +58,7 @@ namespace Apache.Ignite.Core.Impl.Binary /// <returns>TimeSpan.</returns> public static TimeSpan ReadLongAsTimespan(this IBinaryRawReader reader) { - long ms = reader.ReadLong(); - - if (ms >= TimeSpan.MaxValue.TotalMilliseconds) - return TimeSpan.MaxValue; - - if (ms <= TimeSpan.MinValue.TotalMilliseconds) - return TimeSpan.MinValue; - - return TimeSpan.FromMilliseconds(ms); + return BinaryUtils.LongToTimeSpan(reader.ReadLong()); } /// <summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs index 46e6752..139783d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs @@ -1664,6 +1664,20 @@ namespace Apache.Ignite.Core.Impl.Binary } /// <summary> + /// Converts long to timespan. + /// </summary> + public static TimeSpan LongToTimeSpan(long ms) + { + if (ms >= TimeSpan.MaxValue.TotalMilliseconds) + return TimeSpan.MaxValue; + + if (ms <= TimeSpan.MinValue.TotalMilliseconds) + return TimeSpan.MinValue; + + return TimeSpan.FromMilliseconds(ms); + } + + /// <summary> /// Creates and instance from the type name in reader. /// </summary> private static T CreateInstance<T>(BinaryReader reader) http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs index 96b24ab..555c6e6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs @@ -93,6 +93,12 @@ namespace Apache.Ignite.Core.Impl.Datastream /** */ private const int OpListenTopology = 11; + /** */ + private const int OpGetTimeout = 12; + + /** */ + private const int OpSetTimeout = 13; + /** Cache name. */ private readonly string _cacheName; @@ -356,8 +362,6 @@ namespace Apache.Ignite.Core.Impl.Datastream { get { - ThrowIfDisposed(); - return _closeFut.Task; } } @@ -549,6 +553,41 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** <inheritDoc /> */ + public TimeSpan Timeout + { + get + { + _rwLock.EnterReadLock(); + + try + { + ThrowIfDisposed(); + + return BinaryUtils.LongToTimeSpan(DoOutInOp(OpGetTimeout)); + } + finally + { + _rwLock.ExitReadLock(); + } + } + set + { + _rwLock.EnterWriteLock(); + + try + { + ThrowIfDisposed(); + + DoOutInOp(OpSetTimeout, (long) value.TotalMilliseconds); + } + finally + { + _rwLock.ExitWriteLock(); + } + } + } + + /** <inheritDoc /> */ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] protected override void Dispose(bool disposing) {
