http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs new file mode 100644 index 0000000..680228d --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + using System; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Datastream; + using Apache.Ignite.Core.Events; + using Apache.Ignite.Core.Impl.Cache; + using Apache.Ignite.Core.Impl.Datastream; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Messaging; + + /// <summary> + /// Type descriptor with precompiled delegates for known methods. + /// </summary> + internal class DelegateTypeDescriptor + { + /** Cached decriptors. */ + private static readonly CopyOnWriteConcurrentDictionary<Type, DelegateTypeDescriptor> Descriptors + = new CopyOnWriteConcurrentDictionary<Type, DelegateTypeDescriptor>(); + + /** */ + private readonly Func<object, object> _computeOutFunc; + + /** */ + private readonly Func<object, object, object> _computeFunc; + + /** */ + private readonly Func<object, Guid, object, bool> _eventFilter; + + /** */ + private readonly Func<object, object, object, bool> _cacheEntryFilter; + + /** */ + private readonly Func<object, object, object, byte, bool> _cacheDrEntryFilter; + + /** */ + private readonly Tuple<Func<object, IMutableCacheEntryInternal, object, object>, Tuple<Type, Type>> + _cacheEntryProcessor; + + /** */ + private readonly Func<object, Guid, object, bool> _messageFilter; + + /** */ + private readonly Func<object, object> _computeJobExecute; + + /** */ + private readonly Action<object> _computeJobCancel; + + /** */ + private readonly Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> _streamReceiver; + + /** */ + private readonly Func<object, object> _streamTransformerCtor; + + /// <summary> + /// Gets the <see cref="IComputeFunc{T}" /> invocator. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Precompiled invocator delegate.</returns> + public static Func<object, object> GetComputeOutFunc(Type type) + { + return Get(type)._computeOutFunc; + } + + /// <summary> + /// Gets the <see cref="IComputeFunc{T, R}" /> invocator. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Precompiled invocator delegate.</returns> + public static Func<object, object, object> GetComputeFunc(Type type) + { + return Get(type)._computeFunc; + } + + /// <summary> + /// Gets the <see cref="IEventFilter{T}" /> invocator. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Precompiled invocator delegate.</returns> + public static Func<object, Guid, object, bool> GetEventFilter(Type type) + { + return Get(type)._eventFilter; + } + + /// <summary> + /// Gets the <see cref="ICacheEntryFilter{TK,TV}" /> invocator. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Precompiled invocator delegate.</returns> + public static Func<object, object, object, bool> GetCacheEntryFilter(Type type) + { + return Get(type)._cacheEntryFilter; + } + + /// <summary> + /// Gets the <see cref="ICacheDrEntryFilter{K, V}" /> invocator. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Precompiled invocator delegate.</returns> + public static Func<object, object, object, byte, bool> GetCacheDrEntryFilter(Type type) + { + return Get(type)._cacheDrEntryFilter; + } + + /// <summary> + /// Gets the <see cref="ICacheEntryProcessor{K, V, A, R}" /> invocator. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Precompiled invocator delegate.</returns> + public static Func<object, IMutableCacheEntryInternal, object, object> GetCacheEntryProcessor(Type type) + { + return Get(type)._cacheEntryProcessor.Item1; + } + + /// <summary> + /// Gets key and value types for the <see cref="ICacheEntryProcessor{K, V, A, R}" />. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Key and value types.</returns> + public static Tuple<Type, Type> GetCacheEntryProcessorTypes(Type type) + { + return Get(type)._cacheEntryProcessor.Item2; + } + + /// <summary> + /// Gets the <see cref="IMessageFilter{T}" /> invocator. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Precompiled invocator delegate.</returns> + public static Func<object, Guid, object, bool> GetMessageFilter(Type type) + { + return Get(type)._messageFilter; + } + + /// <summary> + /// Gets the <see cref="IComputeJob{T}.Execute" /> and <see cref="IComputeJob{T}.Cancel" /> invocators. + /// </summary> + /// <param name="type">Type.</param> + /// <param name="execute">Execute invocator.</param> + /// <param name="cancel">Cancel invocator.</param> + public static void GetComputeJob(Type type, out Func<object, object> execute, out Action<object> cancel) + { + var desc = Get(type); + + execute = desc._computeJobExecute; + cancel = desc._computeJobCancel; + } + + /// <summary> + /// Gets the <see cref="IStreamReceiver{TK,TV}"/> invocator. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Precompiled invocator delegate.</returns> + public static Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> GetStreamReceiver(Type type) + { + return Get(type)._streamReceiver; + } + + /// <summary> + /// Gets the <see cref="StreamTransformer{K, V, A, R}"/>> ctor invocator. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Precompiled invocator delegate.</returns> + public static Func<object, object> GetStreamTransformerCtor(Type type) + { + return Get(type)._streamTransformerCtor; + } + + /// <summary> + /// Gets the <see cref="DelegateTypeDescriptor" /> by type. + /// </summary> + private static DelegateTypeDescriptor Get(Type type) + { + DelegateTypeDescriptor result; + + return Descriptors.TryGetValue(type, out result) + ? result + : Descriptors.GetOrAdd(type, t => new DelegateTypeDescriptor(t)); + } + + /// <summary> + /// Throws an exception if first argument is not null. + /// </summary> + // ReSharper disable once UnusedParameter.Local + private static void ThrowIfMultipleInterfaces(object check, Type userType, Type interfaceType) + { + if (check != null) + throw new InvalidOperationException( + string.Format("Not Supported: Type {0} implements interface {1} multiple times.", userType, + interfaceType)); + } + + /// <summary> + /// Initializes a new instance of the <see cref="DelegateTypeDescriptor"/> class. + /// </summary> + /// <param name="type">The type.</param> + private DelegateTypeDescriptor(Type type) + { + foreach (var iface in type.GetInterfaces()) + { + if (!iface.IsGenericType) + continue; + + var genericTypeDefinition = iface.GetGenericTypeDefinition(); + + if (genericTypeDefinition == typeof (IComputeFunc<>)) + { + ThrowIfMultipleInterfaces(_computeOutFunc, type, typeof(IComputeFunc<>)); + + _computeOutFunc = DelegateConverter.CompileFunc(iface); + } + else if (genericTypeDefinition == typeof (IComputeFunc<,>)) + { + ThrowIfMultipleInterfaces(_computeFunc, type, typeof(IComputeFunc<,>)); + + var args = iface.GetGenericArguments(); + + _computeFunc = DelegateConverter.CompileFunc<Func<object, object, object>>(iface, new[] {args[0]}); + } + else if (genericTypeDefinition == typeof (IEventFilter<>)) + { + ThrowIfMultipleInterfaces(_eventFilter, type, typeof(IEventFilter<>)); + + var args = iface.GetGenericArguments(); + + _eventFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface, + new[] {typeof (Guid), args[0]}, new[] {false, true, false}); + } + else if (genericTypeDefinition == typeof (ICacheEntryFilter<,>)) + { + ThrowIfMultipleInterfaces(_cacheEntryFilter, type, typeof(ICacheEntryFilter<,>)); + + var args = iface.GetGenericArguments(); + + var entryType = typeof (ICacheEntry<,>).MakeGenericType(args); + + var invokeFunc = DelegateConverter.CompileFunc<Func<object, object, bool>>(iface, + new[] { entryType }, new[] { true, false }); + + var ctor = DelegateConverter.CompileCtor<Func<object, object, object>>( + typeof (CacheEntry<,>).MakeGenericType(args), args); + + // Resulting func constructs CacheEntry and passes it to user implementation + _cacheEntryFilter = (obj, k, v) => invokeFunc(obj, ctor(k, v)); + } + else if (genericTypeDefinition == typeof (ICacheEntryProcessor<,,,>)) + { + ThrowIfMultipleInterfaces(_cacheEntryProcessor, type, typeof(ICacheEntryProcessor<,,,>)); + + var args = iface.GetGenericArguments(); + + var entryType = typeof (IMutableCacheEntry<,>).MakeGenericType(args[0], args[1]); + + var func = DelegateConverter.CompileFunc<Func<object, object, object, object>>(iface, + new[] { entryType, args[2] }, null, "Process"); + + var types = new Tuple<Type, Type>(args[0], args[1]); + + _cacheEntryProcessor = new Tuple<Func<object, IMutableCacheEntryInternal, object, object>, Tuple<Type, Type>> + (func, types); + + var transformerType = typeof (StreamTransformer<,,,>).MakeGenericType(args); + + _streamTransformerCtor = DelegateConverter.CompileCtor<Func<object, object>>(transformerType, + new[] {iface}); + } + else if (genericTypeDefinition == typeof (IMessageFilter<>)) + { + ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IMessageFilter<>)); + + var arg = iface.GetGenericArguments()[0]; + + _messageFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface, + new[] { typeof(Guid), arg }, new[] { false, true, false }); + } + else if (genericTypeDefinition == typeof (IComputeJob<>)) + { + ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IComputeJob<>)); + + _computeJobExecute = DelegateConverter.CompileFunc<Func<object, object>>(iface, new Type[0], + methodName: "Execute"); + + _computeJobCancel = DelegateConverter.CompileFunc<Action<object>>(iface, new Type[0], + new[] {false}, "Cancel"); + } + else if (genericTypeDefinition == typeof (IStreamReceiver<,>)) + { + ThrowIfMultipleInterfaces(_streamReceiver, type, typeof (IStreamReceiver<,>)); + + var method = + typeof (StreamReceiverHolder).GetMethod("InvokeReceiver") + .MakeGenericMethod(iface.GetGenericArguments()); + + _streamReceiver = DelegateConverter + .CompileFunc<Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool>>( + typeof (StreamReceiverHolder), + method, + new[] + { + iface, typeof (Ignite), typeof (IUnmanagedTarget), typeof (IPortableStream), + typeof (bool) + }, + new[] {true, false, false, false, false, false}); + } + } + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs index c62cfd2..0bbc1a2 100644 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs @@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Common using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; - using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Portable.IO; @@ -133,7 +132,7 @@ namespace Apache.Ignite.Core.Impl.Common /** <inheritdoc/> */ public void Listen(Action<IFuture<T>> callback) { - GridArgumentCheck.NotNull(callback, "callback"); + IgniteArgumentCheck.NotNull(callback, "callback"); if (!_done) { http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs new file mode 100644 index 0000000..a07d954 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + using System; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + + /// <summary> + /// Marshals and converts future value. + /// </summary> + internal class FutureConverter<T> : IFutureConverter<T> + { + /** Marshaller. */ + private readonly PortableMarshaller _marsh; + + /** Keep portable flag. */ + private readonly bool _keepPortable; + + /** Converting function. */ + private readonly Func<PortableReaderImpl, T> _func; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="marsh">Marshaller.</param> + /// <param name="keepPortable">Keep portable.</param> + /// <param name="func">Converting function.</param> + public FutureConverter(PortableMarshaller marsh, bool keepPortable, + Func<PortableReaderImpl, T> func = null) + { + _marsh = marsh; + _keepPortable = keepPortable; + _func = func ?? (reader => reader.ReadObject<T>()); + } + + /// <summary> + /// Read and convert a value. + /// </summary> + public T Convert(IPortableStream stream) + { + var reader = _marsh.StartUnmarshal(stream, _keepPortable); + + return _func(reader); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs deleted file mode 100644 index a1fadfe..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Common -{ - using System; - using System.Collections.Generic; - - /// <summary> - /// Arguments check helpers. - /// </summary> - public static class GridArgumentCheck - { - /// <summary> - /// Throws an ArgumentNullException if specified arg is null. - /// </summary> - /// <param name="arg">The argument.</param> - /// <param name="argName">Name of the argument.</param> - public static void NotNull(object arg, string argName) - { - if (arg == null) - throw new ArgumentNullException(argName); - } - - /// <summary> - /// Throws an ArgumentException if specified arg is null or empty string. - /// </summary> - /// <param name="arg">The argument.</param> - /// <param name="argName">Name of the argument.</param> - public static void NotNullOrEmpty(string arg, string argName) - { - if (string.IsNullOrEmpty(arg)) - throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName), - argName); - } - - /// <summary> - /// Throws an ArgumentException if specified arg is null or empty string. - /// </summary> - /// <param name="collection">The collection.</param> - /// <param name="argName">Name of the argument.</param> - public static void NotNullOrEmpty<T>(ICollection<T> collection, string argName) - { - if (collection == null || collection.Count == 0) - throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName), - argName); - } - - /// <summary> - /// Throws an ArgumentException if specified condition is false. - /// </summary> - /// <param name="condition">Condition.</param> - /// <param name="argName">Name of the argument.</param> - /// <param name="message">Message.</param> - public static void Ensure(bool condition, string argName, string message) - { - if (!condition) - throw new ArgumentException(string.Format("'{0}' argument is invalid: {1}", argName, message), - argName); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs new file mode 100644 index 0000000..e94c577 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + using System; + using System.Collections.Generic; + + /// <summary> + /// Arguments check helpers. + /// </summary> + public static class IgniteArgumentCheck + { + /// <summary> + /// Throws an ArgumentNullException if specified arg is null. + /// </summary> + /// <param name="arg">The argument.</param> + /// <param name="argName">Name of the argument.</param> + public static void NotNull(object arg, string argName) + { + if (arg == null) + throw new ArgumentNullException(argName); + } + + /// <summary> + /// Throws an ArgumentException if specified arg is null or empty string. + /// </summary> + /// <param name="arg">The argument.</param> + /// <param name="argName">Name of the argument.</param> + public static void NotNullOrEmpty(string arg, string argName) + { + if (string.IsNullOrEmpty(arg)) + throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName), + argName); + } + + /// <summary> + /// Throws an ArgumentException if specified arg is null or empty string. + /// </summary> + /// <param name="collection">The collection.</param> + /// <param name="argName">Name of the argument.</param> + public static void NotNullOrEmpty<T>(ICollection<T> collection, string argName) + { + if (collection == null || collection.Count == 0) + throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName), + argName); + } + + /// <summary> + /// Throws an ArgumentException if specified condition is false. + /// </summary> + /// <param name="condition">Condition.</param> + /// <param name="argName">Name of the argument.</param> + /// <param name="message">Message.</param> + public static void Ensure(bool condition, string argName, string message) + { + if (!condition) + throw new ArgumentException(string.Format("'{0}' argument is invalid: {1}", argName, message), + argName); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs new file mode 100644 index 0000000..733bed0 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Simple wrapper over result to handle marshalling properly. + /// </summary> + internal class PortableResultWrapper : IPortableWriteAware + { + /** */ + private readonly object _result; + + /// <summary> + /// Initializes a new instance of the <see cref="PortableResultWrapper"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public PortableResultWrapper(IPortableReader reader) + { + var reader0 = (PortableReaderImpl)reader.RawReader(); + + _result = PortableUtils.ReadPortableOrSerializable<object>(reader0); + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="res">Result.</param> + public PortableResultWrapper(object res) + { + _result = res; + } + + /// <summary> + /// Result. + /// </summary> + public object Result + { + get { return _result; } + } + + /** <inheritDoc /> */ + public void WritePortable(IPortableWriter writer) + { + var writer0 = (PortableWriterImpl) writer.RawWriter(); + + writer0.DetachNext(); + PortableUtils.WritePortableOrSerializable(writer0, Result); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs new file mode 100644 index 0000000..1a772c2 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute.Closure +{ + using System; + using System.Collections.Generic; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Compute; + + /// <summary> + /// Base class for all tasks working with closures. + /// </summary> + internal abstract class ComputeAbstractClosureTask<TA, T, TR> : IComputeTask<TA, T, TR> + { + /// <summary> + /// This method is called to map or split Ignite task into multiple Ignite jobs. This is the + /// first method that gets called when task execution starts. + /// </summary> + /// <param name="subgrid">Nodes available for this task execution. Note that order of nodes is + /// guaranteed to be randomized by container. This ensures that every time you simply iterate + /// through Ignite nodes, the order of nodes will be random which over time should result into + /// all nodes being used equally.</param> + /// <param name="arg">Task execution argument. Can be <c>null</c>. This is the same argument + /// as the one passed into <c>ICompute.Execute()</c> methods.</param> + /// <returns> + /// Map of Ignite jobs assigned to subgrid node. If <c>null</c> or empty map is returned, + /// exception will be thrown. + /// </returns> + /// <exception cref="System.NotSupportedException">Map step should not be called on this task.</exception> + public IDictionary<IComputeJob<T>, IClusterNode> Map(IList<IClusterNode> subgrid, TA arg) + { + throw new NotSupportedException("Map step should not be called on this task."); + } + + /// <summary> + /// Asynchronous callback invoked every time a result from remote execution is + /// received. It is ultimately upto this method to return a policy based + /// on which the system will either wait for more results, reduce results + /// received so far, or failover this job to another node. See + /// <see cref="ComputeJobResultPolicy" /> for more information. + /// </summary> + /// <param name="res">Received remote Ignite executable result.</param> + /// <param name="rcvd">All previously received results. Note that if task class has + /// <see cref="ComputeTaskNoResultCacheAttribute" /> attribute, then this list will be empty.</param> + /// <returns> + /// Result policy that dictates how to process further upcoming job results. + /// </returns> + public ComputeJobResultPolicy Result(IComputeJobResult<T> res, IList<IComputeJobResult<T>> rcvd) + { + Exception err = res.Exception(); + + if (err != null) + { + if (err is ComputeExecutionRejectedException || err is ClusterTopologyException || + err is ComputeJobFailoverException) + return ComputeJobResultPolicy.Failover; + + throw err; + } + + return Result0(res); + } + + /// <summary> + /// Reduces (or aggregates) results received so far into one compound result to be returned to + /// caller via future. + /// <para /> + /// Note, that if some jobs did not succeed and could not be failed over then the list of + /// results passed into this method will include the failed results. Otherwise, failed + /// results will not be in the list. + /// </summary> + /// <param name="results">Received job results. Note that if task class has + /// <see cref="ComputeTaskNoResultCacheAttribute" /> attribute, then this list will be empty.</param> + /// <returns> + /// Task result constructed from results of remote executions. + /// </returns> + public abstract TR Reduce(IList<IComputeJobResult<T>> results); + + /// <summary> + /// Internal result processing routine. + /// </summary> + /// <param name="res">Result.</param> + /// <returns>Policy.</returns> + protected abstract ComputeJobResultPolicy Result0(IComputeJobResult<T> res); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs new file mode 100644 index 0000000..c91a167 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute.Closure +{ + using System; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// System job which wraps over <c>Action</c>. + /// </summary> + internal class ComputeActionJob : IComputeJob, IComputeResourceInjector, IPortableWriteAware + { + /** Closure. */ + private readonly IComputeAction _action; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="action">Action.</param> + public ComputeActionJob(IComputeAction action) + { + _action = action; + } + + /** <inheritDoc /> */ + public object Execute() + { + _action.Invoke(); + + return null; + } + + /** <inheritDoc /> */ + public void Cancel() + { + throw new NotSupportedException("Func job cannot be cancelled."); + } + + /** <inheritDoc /> */ + public void Inject(Ignite grid) + { + ResourceProcessor.Inject(_action, grid); + } + + /** <inheritDoc /> */ + public void WritePortable(IPortableWriter writer) + { + var writer0 = (PortableWriterImpl)writer.RawWriter(); + + writer0.DetachNext(); + PortableUtils.WritePortableOrSerializable(writer0, _action); + } + + /// <summary> + /// Initializes a new instance of the <see cref="ComputeActionJob"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public ComputeActionJob(IPortableReader reader) + { + var reader0 = (PortableReaderImpl)reader.RawReader(); + + _action = PortableUtils.ReadPortableOrSerializable<IComputeAction>(reader0); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs new file mode 100644 index 0000000..381c701 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute.Closure +{ + using System; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// System job which wraps over <c>Func</c>. + /// </summary> + internal class ComputeFuncJob : IComputeJob, IComputeResourceInjector, IPortableWriteAware + { + /** Closure. */ + private readonly IComputeFunc _clo; + + /** Argument. */ + private readonly object _arg; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="clo">Closure.</param> + /// <param name="arg">Argument.</param> + public ComputeFuncJob(IComputeFunc clo, object arg) + { + _clo = clo; + _arg = arg; + } + + /** <inheritDoc /> */ + public object Execute() + { + return _clo.Invoke(_arg); + } + + /** <inheritDoc /> */ + public void Cancel() + { + throw new NotSupportedException("Func job cannot be cancelled."); + } + + /** <inheritDoc /> */ + public void Inject(Ignite grid) + { + ResourceProcessor.Inject(_clo, grid); + } + + /** <inheritDoc /> */ + public void WritePortable(IPortableWriter writer) + { + PortableWriterImpl writer0 = (PortableWriterImpl) writer.RawWriter(); + + writer0.DetachNext(); + PortableUtils.WritePortableOrSerializable(writer0, _clo); + + writer0.DetachNext(); + PortableUtils.WritePortableOrSerializable(writer0, _arg); + } + + /// <summary> + /// Initializes a new instance of the <see cref="ComputeFuncJob"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public ComputeFuncJob(IPortableReader reader) + { + var reader0 = (PortableReaderImpl) reader.RawReader(); + + _clo = PortableUtils.ReadPortableOrSerializable<IComputeFunc>(reader0); + _arg = PortableUtils.ReadPortableOrSerializable<object>(reader0); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs new file mode 100644 index 0000000..dd57f6c --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute.Closure +{ + using System.Collections.Generic; + using Apache.Ignite.Core.Compute; + + /// <summary> + /// Closure-based task producing multiple jobs and returning a collection of job results. + /// </summary> + [ComputeTaskNoResultCache] + internal class ComputeMultiClosureTask<TA, T, TR> : ComputeAbstractClosureTask<TA, T, TR> + where TR : ICollection<T> + { + /** Result. */ + private readonly ICollection<T> _res; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="size">Expected results count.</param> + public ComputeMultiClosureTask(int size) + { + _res = new List<T>(size); + } + + /** <inheritDoc /> */ + protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res) + { + _res.Add(res.Data()); + + return ComputeJobResultPolicy.Wait; + } + + /** <inheritDoc /> */ + public override TR Reduce(IList<IComputeJobResult<T>> results) + { + return (TR) _res; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs new file mode 100644 index 0000000..5f719cd --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute.Closure +{ + using System; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// System job which wraps over <c>Func</c>. + /// </summary> + internal class ComputeOutFuncJob : IComputeJob, IComputeResourceInjector, IPortableWriteAware + { + /** Closure. */ + private readonly IComputeOutFunc _clo; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="clo">Closure.</param> + public ComputeOutFuncJob(IComputeOutFunc clo) + { + _clo = clo; + } + + /** <inheritDoc /> */ + public object Execute() + { + return _clo.Invoke(); + } + + /** <inheritDoc /> */ + public void Cancel() + { + throw new NotSupportedException("Func job cannot be cancelled."); + } + + /** <inheritDoc /> */ + public void Inject(Ignite grid) + { + ResourceProcessor.Inject(_clo, grid); + } + + /** <inheritDoc /> */ + public void WritePortable(IPortableWriter writer) + { + var writer0 = (PortableWriterImpl) writer.RawWriter(); + + writer0.DetachNext(); + PortableUtils.WritePortableOrSerializable(writer0, _clo); + } + + public ComputeOutFuncJob(IPortableReader reader) + { + var reader0 = (PortableReaderImpl) reader.RawReader(); + + _clo = PortableUtils.ReadPortableOrSerializable<IComputeOutFunc>(reader0); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs new file mode 100644 index 0000000..a84d7ce --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute.Closure +{ + using System.Collections.Generic; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Impl.Resource; + + /// <summary> + /// Closure-based task producing only one job and thus having only single result. + /// </summary> + [ComputeTaskNoResultCache] + internal class ComputeReducingClosureTask<TA, T, TR> + : ComputeAbstractClosureTask<TA, T, TR>, IComputeResourceInjector + { + /** Reducer. */ + private readonly IComputeReducer<T, TR> _rdc; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="rdc">Reducer.</param> + public ComputeReducingClosureTask(IComputeReducer<T, TR> rdc) + { + _rdc = rdc; + } + + /** <inheritDoc /> */ + protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res) + { + return _rdc.Collect(res.Data()) ? ComputeJobResultPolicy.Wait : ComputeJobResultPolicy.Reduce; + } + + /** <inheritDoc /> */ + public override TR Reduce(IList<IComputeJobResult<T>> results) + { + return _rdc.Reduce(); + } + + /** <inheritDoc /> */ + public void Inject(Ignite grid) + { + ResourceProcessor.Inject(_rdc, grid); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs new file mode 100644 index 0000000..6e82c9b --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute.Closure +{ + using System.Collections.Generic; + using Apache.Ignite.Core.Compute; + + /// <summary> + /// Closure-based task producing only one job and thus having only single result. + /// </summary> + [ComputeTaskNoResultCache] + internal class ComputeSingleClosureTask<TA, T, TR> : ComputeAbstractClosureTask<TA, T, TR> where TR : T + { + /** Result. */ + private TR _res; + + /** <inheritDoc /> */ + protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res) + { + _res = (TR) res.Data(); + + // No more results are expected at this point, but we prefer not to alter regular + // task flow. + return ComputeJobResultPolicy.Wait; + } + + /** <inheritDoc /> */ + public override TR Reduce(IList<IComputeJobResult<T>> results) + { + return _res; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs new file mode 100644 index 0000000..8d3e8d7 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute.Closure +{ + /// <summary> + /// Interface denoting entity which must perform custom resource injection. + /// </summary> + internal interface IComputeResourceInjector + { + /// <summary> + /// Inject resources. + /// </summary> + /// <param name="grid">Grid.</param> + void Inject(Ignite grid); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs new file mode 100644 index 0000000..7efabd1 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Compute; + + /// <summary> + /// Synchronous Compute facade. + /// </summary> + internal class Compute : ICompute + { + /** */ + private readonly ComputeImpl _compute; + + /// <summary> + /// Initializes a new instance of the <see cref="Compute"/> class. + /// </summary> + /// <param name="computeImpl">The compute implementation.</param> + public Compute(ComputeImpl computeImpl) + { + Debug.Assert(computeImpl != null); + + _compute = computeImpl; + } + + /** <inheritDoc /> */ + public ICompute WithAsync() + { + return new ComputeAsync(_compute); + } + + /** <inheritDoc /> */ + public bool IsAsync + { + get { return false; } + } + + /** <inheritDoc /> */ + public IFuture GetFuture() + { + throw IgniteUtils.GetAsyncModeDisabledException(); + } + + /** <inheritDoc /> */ + public IFuture<TResult> GetFuture<TResult>() + { + throw IgniteUtils.GetAsyncModeDisabledException(); + } + + /** <inheritDoc /> */ + public IClusterGroup ClusterGroup + { + get { return _compute.ClusterGroup; } + } + + /** <inheritDoc /> */ + public ICompute WithNoFailover() + { + _compute.WithNoFailover(); + + return this; + } + + /** <inheritDoc /> */ + public ICompute WithTimeout(long timeout) + { + _compute.WithTimeout(timeout); + + return this; + } + + /** <inheritDoc /> */ + public ICompute WithKeepPortable() + { + _compute.WithKeepPortable(); + + return this; + } + + /** <inheritDoc /> */ + public T ExecuteJavaTask<T>(string taskName, object taskArg) + { + return _compute.ExecuteJavaTask<T>(taskName, taskArg); + } + + /** <inheritDoc /> */ + public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg) + { + return _compute.Execute(task, taskArg).Get(); + } + + /** <inheritDoc /> */ + public TR Execute<T, TR>(IComputeTask<T, TR> task) + { + return _compute.Execute(task, null).Get(); + } + + /** <inheritDoc /> */ + public TR Execute<TA, T, TR>(Type taskType, TA taskArg) + { + return _compute.Execute<TA, T, TR>(taskType, taskArg).Get(); + } + + public TR Execute<T, TR>(Type taskType) + { + return _compute.Execute<object, T, TR>(taskType, null).Get(); + } + + /** <inheritDoc /> */ + public TR Call<TR>(IComputeFunc<TR> clo) + { + return _compute.Execute(clo).Get(); + } + + /** <inheritDoc /> */ + public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo) + { + return _compute.AffinityCall(cacheName, affinityKey, clo).Get(); + } + + /** <inheritDoc /> */ + public TR Call<TR>(Func<TR> func) + { + return _compute.Execute(func).Get(); + } + + /** <inheritDoc /> */ + public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos) + { + return _compute.Execute(clos).Get(); + } + + /** <inheritDoc /> */ + public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc) + { + return _compute.Execute(clos, rdc).Get(); + } + + /** <inheritDoc /> */ + public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo) + { + return _compute.Broadcast(clo).Get(); + } + + /** <inheritDoc /> */ + public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg) + { + return _compute.Broadcast(clo, arg).Get(); + } + + /** <inheritDoc /> */ + public void Broadcast(IComputeAction action) + { + _compute.Broadcast(action).Get(); + } + + /** <inheritDoc /> */ + public void Run(IComputeAction action) + { + _compute.Run(action).Get(); + } + + /** <inheritDoc /> */ + public void AffinityRun(string cacheName, object affinityKey, IComputeAction action) + { + _compute.AffinityRun(cacheName, affinityKey, action).Get(); + } + + /** <inheritDoc /> */ + public void Run(IEnumerable<IComputeAction> actions) + { + _compute.Run(actions).Get(); + } + + /** <inheritDoc /> */ + public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg) + { + return _compute.Apply(clo, arg).Get(); + } + + /** <inheritDoc /> */ + public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args) + { + return _compute.Apply(clo, args).Get(); + } + + /** <inheritDoc /> */ + public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc) + { + return _compute.Apply(clo, args, rdc).Get(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs new file mode 100644 index 0000000..199afc2 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute +{ + using System; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using System.Threading; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Compute; + + /// <summary> + /// Asynchronous Compute facade. + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] + internal class ComputeAsync : ICompute + { + /** */ + protected readonly ComputeImpl Compute; + + /** Current future. */ + private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>(); + + /// <summary> + /// Initializes a new instance of the <see cref="ComputeAsync"/> class. + /// </summary> + /// <param name="computeImpl">The compute implementation.</param> + internal ComputeAsync(ComputeImpl computeImpl) + { + Compute = computeImpl; + } + + /** <inheritDoc /> */ + public ICompute WithAsync() + { + return this; + } + + /** <inheritDoc /> */ + public bool IsAsync + { + get { return true; } + } + + /** <inheritDoc /> */ + public IFuture GetFuture() + { + return GetFuture<object>(); + } + + /** <inheritDoc /> */ + public IFuture<TResult> GetFuture<TResult>() + { + var fut = _curFut.Value; + + if (fut == null) + throw new InvalidOperationException("Asynchronous operation not started."); + + var fut0 = fut as IFuture<TResult>; + + if (fut0 == null) + throw new InvalidOperationException( + string.Format("Requested future type {0} is incompatible with current future type {1}", + typeof(IFuture<TResult>), fut.GetType())); + + _curFut.Value = null; + + return fut0; + } + + /** <inheritDoc /> */ + public IClusterGroup ClusterGroup + { + get { return Compute.ClusterGroup; } + } + + /** <inheritDoc /> */ + public ICompute WithNoFailover() + { + Compute.WithNoFailover(); + + return this; + } + + /** <inheritDoc /> */ + public ICompute WithTimeout(long timeout) + { + Compute.WithTimeout(timeout); + + return this; + } + + /** <inheritDoc /> */ + public ICompute WithKeepPortable() + { + Compute.WithKeepPortable(); + + return this; + } + + /** <inheritDoc /> */ + public T ExecuteJavaTask<T>(string taskName, object taskArg) + { + _curFut.Value = Compute.ExecuteJavaTaskAsync<T>(taskName, taskArg); + + return default(T); + } + + /** <inheritDoc /> */ + public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg) + { + _curFut.Value = Compute.Execute(task, taskArg); + + return default(TR); + } + + /** <inheritDoc /> */ + public TR Execute<T, TR>(IComputeTask<T, TR> task) + { + _curFut.Value = Compute.Execute(task, null); + + return default(TR); + } + + /** <inheritDoc /> */ + public TR Execute<TA, T, TR>(Type taskType, TA taskArg) + { + _curFut.Value = Compute.Execute<TA, T, TR>(taskType, taskArg); + + return default(TR); + } + + /** <inheritDoc /> */ + public TR Execute<T, TR>(Type taskType) + { + _curFut.Value = Compute.Execute<object, T, TR>(taskType, null); + + return default(TR); + } + + /** <inheritDoc /> */ + public TR Call<TR>(IComputeFunc<TR> clo) + { + _curFut.Value = Compute.Execute(clo); + + return default(TR); + } + + /** <inheritDoc /> */ + public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo) + { + Compute.AffinityCall(cacheName, affinityKey, clo); + + return default(TR); + } + + /** <inheritDoc /> */ + public TR Call<TR>(Func<TR> func) + { + _curFut.Value = Compute.Execute(func); + + return default(TR); + } + + /** <inheritDoc /> */ + public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos) + { + _curFut.Value = Compute.Execute(clos); + + return null; + } + + /** <inheritDoc /> */ + public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc) + { + _curFut.Value = Compute.Execute(clos, rdc); + + return default(TR2); + } + + /** <inheritDoc /> */ + public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo) + { + _curFut.Value = Compute.Broadcast(clo); + + return null; + } + + /** <inheritDoc /> */ + public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg) + { + _curFut.Value = Compute.Broadcast(clo, arg); + + return null; + } + + /** <inheritDoc /> */ + public void Broadcast(IComputeAction action) + { + _curFut.Value = Compute.Broadcast(action); + } + + /** <inheritDoc /> */ + public void Run(IComputeAction action) + { + _curFut.Value = Compute.Run(action); + } + + /** <inheritDoc /> */ + public void AffinityRun(string cacheName, object affinityKey, IComputeAction action) + { + Compute.AffinityRun(cacheName, affinityKey, action); + } + + /** <inheritDoc /> */ + public void Run(IEnumerable<IComputeAction> actions) + { + _curFut.Value = Compute.Run(actions); + } + + /** <inheritDoc /> */ + public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg) + { + _curFut.Value = Compute.Apply(clo, arg); + + return default(TR); + } + + /** <inheritDoc /> */ + public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args) + { + _curFut.Value = Compute.Apply(clo, args); + + return null; + } + + /** <inheritDoc /> */ + public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc) + { + _curFut.Value = Compute.Apply(clo, args, rdc); + + return default(TR2); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs new file mode 100644 index 0000000..a971418 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute +{ + using System; + using System.Reflection; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Portable; + using Apache.Ignite.Core.Resource; + + /// <summary> + /// Non-generic version of IComputeFunc{T}. + /// </summary> + internal interface IComputeFunc : IComputeFunc<object, object> + { + // No-op + } + + /// <summary> + /// Wraps generic func into a non-generic for internal usage. + /// </summary> + internal class ComputeFuncWrapper : IComputeFunc, IPortableWriteAware + { + /** */ + private readonly object _func; + + /** */ + private readonly Func<object, object, object> _invoker; + + /// <summary> + /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class. + /// </summary> + /// <param name="func">The function to wrap.</param> + /// <param name="invoker">The function invoker.</param> + public ComputeFuncWrapper(object func, Func<object, object> invoker) + { + _func = func; + + _invoker = (target, arg) => invoker(arg); + } + + /** <inheritDoc /> */ + public object Invoke(object arg) + { + try + { + return _invoker(_func, arg); + } + catch (TargetInvocationException ex) + { + throw ex.InnerException; + } + } + + /** <inheritDoc /> */ + public void WritePortable(IPortableWriter writer) + { + var writer0 = (PortableWriterImpl)writer.RawWriter(); + + writer0.DetachNext(); + PortableUtils.WritePortableOrSerializable(writer0, _func); + } + + /// <summary> + /// Initializes a new instance of the <see cref="ComputeFuncWrapper"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public ComputeFuncWrapper(IPortableReader reader) + { + var reader0 = (PortableReaderImpl)reader.RawReader(); + + _func = PortableUtils.ReadPortableOrSerializable<object>(reader0); + + _invoker = DelegateTypeDescriptor.GetComputeFunc(_func.GetType()); + } + + /// <summary> + /// Injects the Ignite instance. + /// </summary> + [InstanceResource] + public void InjectIgnite(IIgnite ignite) + { + // Propagate injection + ResourceProcessor.Inject(_func, (IgniteProxy) ignite); + } + } + + /// <summary> + /// Extension methods for IComputeFunc{T}. + /// </summary> + internal static class ComputeFuncExtensions + { + /// <summary> + /// Convert to non-generic wrapper. + /// </summary> + public static IComputeFunc ToNonGeneric<T, TR>(this IComputeFunc<T, TR> func) + { + return new ComputeFuncWrapper(func, x => func.Invoke((T) x)); + } + } +}
