http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs deleted file mode 100644 index 5f719cd..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute.Closure -{ - using System; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Resource; - using Apache.Ignite.Core.Portable; - - /// <summary> - /// System job which wraps over <c>Func</c>. - /// </summary> - internal class ComputeOutFuncJob : IComputeJob, IComputeResourceInjector, IPortableWriteAware - { - /** Closure. */ - private readonly IComputeOutFunc _clo; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="clo">Closure.</param> - public ComputeOutFuncJob(IComputeOutFunc clo) - { - _clo = clo; - } - - /** <inheritDoc /> */ - public object Execute() - { - return _clo.Invoke(); - } - - /** <inheritDoc /> */ - public void Cancel() - { - throw new NotSupportedException("Func job cannot be cancelled."); - } - - /** <inheritDoc /> */ - public void Inject(Ignite grid) - { - ResourceProcessor.Inject(_clo, grid); - } - - /** <inheritDoc /> */ - public void WritePortable(IPortableWriter writer) - { - var writer0 = (PortableWriterImpl) writer.RawWriter(); - - writer0.DetachNext(); - PortableUtils.WritePortableOrSerializable(writer0, _clo); - } - - public ComputeOutFuncJob(IPortableReader reader) - { - var reader0 = (PortableReaderImpl) reader.RawReader(); - - _clo = PortableUtils.ReadPortableOrSerializable<IComputeOutFunc>(reader0); - } - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs deleted file mode 100644 index a84d7ce..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute.Closure -{ - using System.Collections.Generic; - using Apache.Ignite.Core.Compute; - using Apache.Ignite.Core.Impl.Resource; - - /// <summary> - /// Closure-based task producing only one job and thus having only single result. - /// </summary> - [ComputeTaskNoResultCache] - internal class ComputeReducingClosureTask<TA, T, TR> - : ComputeAbstractClosureTask<TA, T, TR>, IComputeResourceInjector - { - /** Reducer. */ - private readonly IComputeReducer<T, TR> _rdc; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="rdc">Reducer.</param> - public ComputeReducingClosureTask(IComputeReducer<T, TR> rdc) - { - _rdc = rdc; - } - - /** <inheritDoc /> */ - protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res) - { - return _rdc.Collect(res.Data()) ? ComputeJobResultPolicy.Wait : ComputeJobResultPolicy.Reduce; - } - - /** <inheritDoc /> */ - public override TR Reduce(IList<IComputeJobResult<T>> results) - { - return _rdc.Reduce(); - } - - /** <inheritDoc /> */ - public void Inject(Ignite grid) - { - ResourceProcessor.Inject(_rdc, grid); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs deleted file mode 100644 index 6e82c9b..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute.Closure -{ - using System.Collections.Generic; - using Apache.Ignite.Core.Compute; - - /// <summary> - /// Closure-based task producing only one job and thus having only single result. - /// </summary> - [ComputeTaskNoResultCache] - internal class ComputeSingleClosureTask<TA, T, TR> : ComputeAbstractClosureTask<TA, T, TR> where TR : T - { - /** Result. */ - private TR _res; - - /** <inheritDoc /> */ - protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res) - { - _res = (TR) res.Data(); - - // No more results are expected at this point, but we prefer not to alter regular - // task flow. - return ComputeJobResultPolicy.Wait; - } - - /** <inheritDoc /> */ - public override TR Reduce(IList<IComputeJobResult<T>> results) - { - return _res; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs deleted file mode 100644 index 8d3e8d7..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute.Closure -{ - /// <summary> - /// Interface denoting entity which must perform custom resource injection. - /// </summary> - internal interface IComputeResourceInjector - { - /// <summary> - /// Inject resources. - /// </summary> - /// <param name="grid">Grid.</param> - void Inject(Ignite grid); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs deleted file mode 100644 index 7efabd1..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using System.Collections.Generic; - using System.Diagnostics; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Compute; - - /// <summary> - /// Synchronous Compute facade. - /// </summary> - internal class Compute : ICompute - { - /** */ - private readonly ComputeImpl _compute; - - /// <summary> - /// Initializes a new instance of the <see cref="Compute"/> class. - /// </summary> - /// <param name="computeImpl">The compute implementation.</param> - public Compute(ComputeImpl computeImpl) - { - Debug.Assert(computeImpl != null); - - _compute = computeImpl; - } - - /** <inheritDoc /> */ - public ICompute WithAsync() - { - return new ComputeAsync(_compute); - } - - /** <inheritDoc /> */ - public bool IsAsync - { - get { return false; } - } - - /** <inheritDoc /> */ - public IFuture GetFuture() - { - throw IgniteUtils.GetAsyncModeDisabledException(); - } - - /** <inheritDoc /> */ - public IFuture<TResult> GetFuture<TResult>() - { - throw IgniteUtils.GetAsyncModeDisabledException(); - } - - /** <inheritDoc /> */ - public IClusterGroup ClusterGroup - { - get { return _compute.ClusterGroup; } - } - - /** <inheritDoc /> */ - public ICompute WithNoFailover() - { - _compute.WithNoFailover(); - - return this; - } - - /** <inheritDoc /> */ - public ICompute WithTimeout(long timeout) - { - _compute.WithTimeout(timeout); - - return this; - } - - /** <inheritDoc /> */ - public ICompute WithKeepPortable() - { - _compute.WithKeepPortable(); - - return this; - } - - /** <inheritDoc /> */ - public T ExecuteJavaTask<T>(string taskName, object taskArg) - { - return _compute.ExecuteJavaTask<T>(taskName, taskArg); - } - - /** <inheritDoc /> */ - public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg) - { - return _compute.Execute(task, taskArg).Get(); - } - - /** <inheritDoc /> */ - public TR Execute<T, TR>(IComputeTask<T, TR> task) - { - return _compute.Execute(task, null).Get(); - } - - /** <inheritDoc /> */ - public TR Execute<TA, T, TR>(Type taskType, TA taskArg) - { - return _compute.Execute<TA, T, TR>(taskType, taskArg).Get(); - } - - public TR Execute<T, TR>(Type taskType) - { - return _compute.Execute<object, T, TR>(taskType, null).Get(); - } - - /** <inheritDoc /> */ - public TR Call<TR>(IComputeFunc<TR> clo) - { - return _compute.Execute(clo).Get(); - } - - /** <inheritDoc /> */ - public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo) - { - return _compute.AffinityCall(cacheName, affinityKey, clo).Get(); - } - - /** <inheritDoc /> */ - public TR Call<TR>(Func<TR> func) - { - return _compute.Execute(func).Get(); - } - - /** <inheritDoc /> */ - public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos) - { - return _compute.Execute(clos).Get(); - } - - /** <inheritDoc /> */ - public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc) - { - return _compute.Execute(clos, rdc).Get(); - } - - /** <inheritDoc /> */ - public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo) - { - return _compute.Broadcast(clo).Get(); - } - - /** <inheritDoc /> */ - public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg) - { - return _compute.Broadcast(clo, arg).Get(); - } - - /** <inheritDoc /> */ - public void Broadcast(IComputeAction action) - { - _compute.Broadcast(action).Get(); - } - - /** <inheritDoc /> */ - public void Run(IComputeAction action) - { - _compute.Run(action).Get(); - } - - /** <inheritDoc /> */ - public void AffinityRun(string cacheName, object affinityKey, IComputeAction action) - { - _compute.AffinityRun(cacheName, affinityKey, action).Get(); - } - - /** <inheritDoc /> */ - public void Run(IEnumerable<IComputeAction> actions) - { - _compute.Run(actions).Get(); - } - - /** <inheritDoc /> */ - public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg) - { - return _compute.Apply(clo, arg).Get(); - } - - /** <inheritDoc /> */ - public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args) - { - return _compute.Apply(clo, args).Get(); - } - - /** <inheritDoc /> */ - public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc) - { - return _compute.Apply(clo, args, rdc).Get(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs deleted file mode 100644 index 199afc2..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using System.Collections.Generic; - using System.Diagnostics.CodeAnalysis; - using System.Threading; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Compute; - - /// <summary> - /// Asynchronous Compute facade. - /// </summary> - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] - internal class ComputeAsync : ICompute - { - /** */ - protected readonly ComputeImpl Compute; - - /** Current future. */ - private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>(); - - /// <summary> - /// Initializes a new instance of the <see cref="ComputeAsync"/> class. - /// </summary> - /// <param name="computeImpl">The compute implementation.</param> - internal ComputeAsync(ComputeImpl computeImpl) - { - Compute = computeImpl; - } - - /** <inheritDoc /> */ - public ICompute WithAsync() - { - return this; - } - - /** <inheritDoc /> */ - public bool IsAsync - { - get { return true; } - } - - /** <inheritDoc /> */ - public IFuture GetFuture() - { - return GetFuture<object>(); - } - - /** <inheritDoc /> */ - public IFuture<TResult> GetFuture<TResult>() - { - var fut = _curFut.Value; - - if (fut == null) - throw new InvalidOperationException("Asynchronous operation not started."); - - var fut0 = fut as IFuture<TResult>; - - if (fut0 == null) - throw new InvalidOperationException( - string.Format("Requested future type {0} is incompatible with current future type {1}", - typeof(IFuture<TResult>), fut.GetType())); - - _curFut.Value = null; - - return fut0; - } - - /** <inheritDoc /> */ - public IClusterGroup ClusterGroup - { - get { return Compute.ClusterGroup; } - } - - /** <inheritDoc /> */ - public ICompute WithNoFailover() - { - Compute.WithNoFailover(); - - return this; - } - - /** <inheritDoc /> */ - public ICompute WithTimeout(long timeout) - { - Compute.WithTimeout(timeout); - - return this; - } - - /** <inheritDoc /> */ - public ICompute WithKeepPortable() - { - Compute.WithKeepPortable(); - - return this; - } - - /** <inheritDoc /> */ - public T ExecuteJavaTask<T>(string taskName, object taskArg) - { - _curFut.Value = Compute.ExecuteJavaTaskAsync<T>(taskName, taskArg); - - return default(T); - } - - /** <inheritDoc /> */ - public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg) - { - _curFut.Value = Compute.Execute(task, taskArg); - - return default(TR); - } - - /** <inheritDoc /> */ - public TR Execute<T, TR>(IComputeTask<T, TR> task) - { - _curFut.Value = Compute.Execute(task, null); - - return default(TR); - } - - /** <inheritDoc /> */ - public TR Execute<TA, T, TR>(Type taskType, TA taskArg) - { - _curFut.Value = Compute.Execute<TA, T, TR>(taskType, taskArg); - - return default(TR); - } - - /** <inheritDoc /> */ - public TR Execute<T, TR>(Type taskType) - { - _curFut.Value = Compute.Execute<object, T, TR>(taskType, null); - - return default(TR); - } - - /** <inheritDoc /> */ - public TR Call<TR>(IComputeFunc<TR> clo) - { - _curFut.Value = Compute.Execute(clo); - - return default(TR); - } - - /** <inheritDoc /> */ - public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo) - { - Compute.AffinityCall(cacheName, affinityKey, clo); - - return default(TR); - } - - /** <inheritDoc /> */ - public TR Call<TR>(Func<TR> func) - { - _curFut.Value = Compute.Execute(func); - - return default(TR); - } - - /** <inheritDoc /> */ - public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos) - { - _curFut.Value = Compute.Execute(clos); - - return null; - } - - /** <inheritDoc /> */ - public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc) - { - _curFut.Value = Compute.Execute(clos, rdc); - - return default(TR2); - } - - /** <inheritDoc /> */ - public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo) - { - _curFut.Value = Compute.Broadcast(clo); - - return null; - } - - /** <inheritDoc /> */ - public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg) - { - _curFut.Value = Compute.Broadcast(clo, arg); - - return null; - } - - /** <inheritDoc /> */ - public void Broadcast(IComputeAction action) - { - _curFut.Value = Compute.Broadcast(action); - } - - /** <inheritDoc /> */ - public void Run(IComputeAction action) - { - _curFut.Value = Compute.Run(action); - } - - /** <inheritDoc /> */ - public void AffinityRun(string cacheName, object affinityKey, IComputeAction action) - { - Compute.AffinityRun(cacheName, affinityKey, action); - } - - /** <inheritDoc /> */ - public void Run(IEnumerable<IComputeAction> actions) - { - _curFut.Value = Compute.Run(actions); - } - - /** <inheritDoc /> */ - public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg) - { - _curFut.Value = Compute.Apply(clo, arg); - - return default(TR); - } - - /** <inheritDoc /> */ - public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args) - { - _curFut.Value = Compute.Apply(clo, args); - - return null; - } - - /** <inheritDoc /> */ - public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc) - { - _curFut.Value = Compute.Apply(clo, args, rdc); - - return default(TR2); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs deleted file mode 100644 index a971418..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using System.Reflection; - using Apache.Ignite.Core.Compute; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Resource; - using Apache.Ignite.Core.Portable; - using Apache.Ignite.Core.Resource; - - /// <summary> - /// Non-generic version of IComputeFunc{T}. - /// </summary> - internal interface IComputeFunc : IComputeFunc<object, object> - { - // No-op - } - - /// <summary> - /// Wraps generic func into a non-generic for internal usage. - /// </summary> - internal class ComputeFuncWrapper : IComputeFunc, IPortableWriteAware - { - /** */ - private readonly object _func; - - /** */ - private readonly Func<object, object, object> _invoker; - - /// <summary> - /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class. - /// </summary> - /// <param name="func">The function to wrap.</param> - /// <param name="invoker">The function invoker.</param> - public ComputeFuncWrapper(object func, Func<object, object> invoker) - { - _func = func; - - _invoker = (target, arg) => invoker(arg); - } - - /** <inheritDoc /> */ - public object Invoke(object arg) - { - try - { - return _invoker(_func, arg); - } - catch (TargetInvocationException ex) - { - throw ex.InnerException; - } - } - - /** <inheritDoc /> */ - public void WritePortable(IPortableWriter writer) - { - var writer0 = (PortableWriterImpl)writer.RawWriter(); - - writer0.DetachNext(); - PortableUtils.WritePortableOrSerializable(writer0, _func); - } - - /// <summary> - /// Initializes a new instance of the <see cref="ComputeFuncWrapper"/> class. - /// </summary> - /// <param name="reader">The reader.</param> - public ComputeFuncWrapper(IPortableReader reader) - { - var reader0 = (PortableReaderImpl)reader.RawReader(); - - _func = PortableUtils.ReadPortableOrSerializable<object>(reader0); - - _invoker = DelegateTypeDescriptor.GetComputeFunc(_func.GetType()); - } - - /// <summary> - /// Injects the Ignite instance. - /// </summary> - [InstanceResource] - public void InjectIgnite(IIgnite ignite) - { - // Propagate injection - ResourceProcessor.Inject(_func, (IgniteProxy) ignite); - } - } - - /// <summary> - /// Extension methods for IComputeFunc{T}. - /// </summary> - internal static class ComputeFuncExtensions - { - /// <summary> - /// Convert to non-generic wrapper. - /// </summary> - public static IComputeFunc ToNonGeneric<T, TR>(this IComputeFunc<T, TR> func) - { - return new ComputeFuncWrapper(func, x => func.Invoke((T) x)); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs deleted file mode 100644 index f0ff968..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs +++ /dev/null @@ -1,645 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using System.Collections; - using System.Collections.Generic; - using System.Diagnostics; - using System.Diagnostics.CodeAnalysis; - using System.Linq; - using System.Runtime.Serialization; - using System.Threading; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Compute; - using Apache.Ignite.Core.Impl.Cluster; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Compute.Closure; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Portable.IO; - using Apache.Ignite.Core.Impl.Unmanaged; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; - - /// <summary> - /// Compute implementation. - /// </summary> - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] - internal class ComputeImpl : PlatformTarget - { - /** */ - private const int OpAffinity = 1; - - /** */ - private const int OpBroadcast = 2; - - /** */ - private const int OpExec = 3; - - /** */ - private const int OpExecAsync = 4; - - /** */ - private const int OpUnicast = 5; - - /** Underlying projection. */ - private readonly ClusterGroupImpl _prj; - - /** Whether objects must be kept portable. */ - private readonly ThreadLocal<bool> _keepPortable = new ThreadLocal<bool>(() => false); - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> - /// <param name="prj">Projection.</param> - /// <param name="keepPortable">"keepPortable" flag.</param> - public ComputeImpl(IUnmanagedTarget target, PortableMarshaller marsh, ClusterGroupImpl prj, bool keepPortable) - : base(target, marsh) - { - _prj = prj; - - _keepPortable.Value = keepPortable; - } - - /// <summary> - /// Grid projection to which this compute instance belongs. - /// </summary> - public IClusterGroup ClusterGroup - { - get - { - return _prj; - } - } - - /// <summary> - /// Sets no-failover flag for the next executed task on this projection in the current thread. - /// If flag is set, job will be never failed over even if remote node crashes or rejects execution. - /// When task starts execution, the no-failover flag is reset, so all other task will use default - /// failover policy, unless this flag is set again. - /// </summary> - public void WithNoFailover() - { - UU.ComputeWithNoFailover(Target); - } - - /// <summary> - /// Sets task timeout for the next executed task on this projection in the current thread. - /// When task starts execution, the timeout is reset, so one timeout is used only once. - /// </summary> - /// <param name="timeout">Computation timeout in milliseconds.</param> - public void WithTimeout(long timeout) - { - UU.ComputeWithTimeout(Target, timeout); - } - - /// <summary> - /// Sets keep-portable flag for the next executed Java task on this projection in the current - /// thread so that task argument passed to Java and returned task results will not be - /// deserialized. - /// </summary> - public void WithKeepPortable() - { - _keepPortable.Value = true; - } - - /// <summary> - /// Executes given Java task on the grid projection. If task for given name has not been deployed yet, - /// then 'taskName' will be used as task class name to auto-deploy the task. - /// </summary> - public T ExecuteJavaTask<T>(string taskName, object taskArg) - { - IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName"); - - ICollection<IClusterNode> nodes = _prj.Predicate == null ? null : _prj.GetNodes(); - - try - { - T res = DoOutInOp<T>(OpExec, writer => - { - WriteTask(writer, taskName, taskArg, nodes); - }); - - return res; - } - finally - { - _keepPortable.Value = false; - } - } - - /// <summary> - /// Executes given Java task asynchronously on the grid projection. - /// If task for given name has not been deployed yet, - /// then 'taskName' will be used as task class name to auto-deploy the task. - /// </summary> - public IFuture<T> ExecuteJavaTaskAsync<T>(string taskName, object taskArg) - { - IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName"); - - ICollection<IClusterNode> nodes = _prj.Predicate == null ? null : _prj.GetNodes(); - - try - { - IFuture<T> fut = null; - - DoOutInOp(OpExecAsync, writer => - { - WriteTask(writer, taskName, taskArg, nodes); - }, input => - { - fut = GetFuture<T>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp), _keepPortable.Value); - }); - - return fut; - } - finally - { - _keepPortable.Value = false; - } - } - - /// <summary> - /// Executes given task on the grid projection. For step-by-step explanation of task execution process - /// refer to <see cref="IComputeTask{A,T,R}"/> documentation. - /// </summary> - /// <param name="task">Task to execute.</param> - /// <param name="taskArg">Optional task argument.</param> - /// <returns>Task result.</returns> - public IFuture<TR> Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg) - { - IgniteArgumentCheck.NotNull(task, "task"); - - var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, taskArg); - - long ptr = Marshaller.Ignite.HandleRegistry.Allocate(holder); - - UU.ComputeExecuteNative(Target, ptr, _prj.TopologyVersion); - - return holder.Future; - } - - /// <summary> - /// Executes given task on the grid projection. For step-by-step explanation of task execution process - /// refer to <see cref="IComputeTask{A,T,R}"/> documentation. - /// </summary> - /// <param name="taskType">Task type.</param> - /// <param name="taskArg">Optional task argument.</param> - /// <returns>Task result.</returns> - public IFuture<TR> Execute<TA, T, TR>(Type taskType, TA taskArg) - { - IgniteArgumentCheck.NotNull(taskType, "taskType"); - - object task = FormatterServices.GetUninitializedObject(taskType); - - var task0 = task as IComputeTask<TA, T, TR>; - - if (task0 == null) - throw new IgniteException("Task type doesn't implement IComputeTask: " + taskType.Name); - - return Execute(task0, taskArg); - } - - /// <summary> - /// Executes provided job on a node in this grid projection. The result of the - /// job execution is returned from the result closure. - /// </summary> - /// <param name="clo">Job to execute.</param> - /// <returns>Job result for this execution.</returns> - public IFuture<TR> Execute<TR>(IComputeFunc<TR> clo) - { - IgniteArgumentCheck.NotNull(clo, "clo"); - - return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(), - new ComputeOutFuncJob(clo.ToNonGeneric()), null, false); - } - - /// <summary> - /// Executes provided delegate on a node in this grid projection. The result of the - /// job execution is returned from the result closure. - /// </summary> - /// <param name="func">Func to execute.</param> - /// <returns>Job result for this execution.</returns> - public IFuture<TR> Execute<TR>(Func<TR> func) - { - IgniteArgumentCheck.NotNull(func, "func"); - - var wrappedFunc = new ComputeOutFuncWrapper(func, () => func()); - - return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(), - new ComputeOutFuncJob(wrappedFunc), null, false); - } - - /// <summary> - /// Executes collection of jobs on nodes within this grid projection. - /// </summary> - /// <param name="clos">Collection of jobs to execute.</param> - /// <returns>Collection of job results for this execution.</returns> - public IFuture<ICollection<TR>> Execute<TR>(IEnumerable<IComputeFunc<TR>> clos) - { - IgniteArgumentCheck.NotNull(clos, "clos"); - - ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(clos)); - - foreach (IComputeFunc<TR> clo in clos) - jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric())); - - return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(jobs.Count), - null, jobs, false); - } - - /// <summary> - /// Executes collection of jobs on nodes within this grid projection. - /// </summary> - /// <param name="clos">Collection of jobs to execute.</param> - /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param> - /// <returns>Collection of job results for this execution.</returns> - public IFuture<TR2> Execute<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc) - { - IgniteArgumentCheck.NotNull(clos, "clos"); - - ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(clos)); - - foreach (var clo in clos) - jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric())); - - return ExecuteClosures0(new ComputeReducingClosureTask<object, TR1, TR2>(rdc), null, jobs, false); - } - - /// <summary> - /// Broadcasts given job to all nodes in grid projection. Every participating node will return a job result. - /// </summary> - /// <param name="clo">Job to broadcast to all projection nodes.</param> - /// <returns>Collection of results for this execution.</returns> - public IFuture<ICollection<TR>> Broadcast<TR>(IComputeFunc<TR> clo) - { - IgniteArgumentCheck.NotNull(clo, "clo"); - - return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1), - new ComputeOutFuncJob(clo.ToNonGeneric()), null, true); - } - - /// <summary> - /// Broadcasts given closure job with passed in argument to all nodes in grid projection. - /// Every participating node will return a job result. - /// </summary> - /// <param name="clo">Job to broadcast to all projection nodes.</param> - /// <param name="arg">Job closure argument.</param> - /// <returns>Collection of results for this execution.</returns> - public IFuture<ICollection<TR>> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg) - { - IgniteArgumentCheck.NotNull(clo, "clo"); - - return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1), - new ComputeFuncJob(clo.ToNonGeneric(), arg), null, true); - } - - /// <summary> - /// Broadcasts given job to all nodes in grid projection. - /// </summary> - /// <param name="action">Job to broadcast to all projection nodes.</param> - public IFuture<object> Broadcast(IComputeAction action) - { - IgniteArgumentCheck.NotNull(action, "action"); - - return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), - new ComputeActionJob(action), opId: OpBroadcast); - } - - /// <summary> - /// Executes provided job on a node in this grid projection. - /// </summary> - /// <param name="action">Job to execute.</param> - public IFuture<object> Run(IComputeAction action) - { - IgniteArgumentCheck.NotNull(action, "action"); - - return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), - new ComputeActionJob(action)); - } - - /// <summary> - /// Executes collection of jobs on Ignite nodes within this grid projection. - /// </summary> - /// <param name="actions">Jobs to execute.</param> - public IFuture<object> Run(IEnumerable<IComputeAction> actions) - { - IgniteArgumentCheck.NotNull(actions, "actions"); - - var actions0 = actions as ICollection; - - if (actions0 == null) - { - var jobs = actions.Select(a => new ComputeActionJob(a)).ToList(); - - return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), jobs: jobs, - jobsCount: jobs.Count); - } - else - { - var jobs = actions.Select(a => new ComputeActionJob(a)); - - return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), jobs: jobs, - jobsCount: actions0.Count); - } - } - - /// <summary> - /// Executes provided closure job on a node in this grid projection. - /// </summary> - /// <param name="clo">Job to run.</param> - /// <param name="arg">Job argument.</param> - /// <returns>Job result for this execution.</returns> - public IFuture<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, T arg) - { - IgniteArgumentCheck.NotNull(clo, "clo"); - - return ExecuteClosures0(new ComputeSingleClosureTask<T, TR, TR>(), - new ComputeFuncJob(clo.ToNonGeneric(), arg), null, false); - } - - /// <summary> - /// Executes provided closure job on nodes within this grid projection. A new job is executed for - /// every argument in the passed in collection. The number of actual job executions will be - /// equal to size of the job arguments collection. - /// </summary> - /// <param name="clo">Job to run.</param> - /// <param name="args">Job arguments.</param> - /// <returns>Collection of job results.</returns> - public IFuture<ICollection<TR>> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args) - { - IgniteArgumentCheck.NotNull(clo, "clo"); - - IgniteArgumentCheck.NotNull(clo, "clo"); - - var jobs = new List<IComputeJob>(GetCountOrZero(args)); - - var func = clo.ToNonGeneric(); - - foreach (T arg in args) - jobs.Add(new ComputeFuncJob(func, arg)); - - return ExecuteClosures0(new ComputeMultiClosureTask<T, TR, ICollection<TR>>(jobs.Count), - null, jobs, false); - } - - /// <summary> - /// Executes provided closure job on nodes within this grid projection. A new job is executed for - /// every argument in the passed in collection. The number of actual job executions will be - /// equal to size of the job arguments collection. The returned job results will be reduced - /// into an individual result by provided reducer. - /// </summary> - /// <param name="clo">Job to run.</param> - /// <param name="args">Job arguments.</param> - /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param> - /// <returns>Reduced job result for this execution.</returns> - public IFuture<TR2> Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, - IComputeReducer<TR1, TR2> rdc) - { - IgniteArgumentCheck.NotNull(clo, "clo"); - - IgniteArgumentCheck.NotNull(clo, "clo"); - - IgniteArgumentCheck.NotNull(clo, "clo"); - - ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(args)); - - var func = clo.ToNonGeneric(); - - foreach (T arg in args) - jobs.Add(new ComputeFuncJob(func, arg)); - - return ExecuteClosures0(new ComputeReducingClosureTask<T, TR1, TR2>(rdc), - null, jobs, false); - } - - /// <summary> - /// Executes given job on the node where data for provided affinity key is located - /// (a.k.a. affinity co-location). - /// </summary> - /// <param name="cacheName">Name of the cache to use for affinity co-location.</param> - /// <param name="affinityKey">Affinity key.</param> - /// <param name="action">Job to execute.</param> - public IFuture AffinityRun(string cacheName, object affinityKey, IComputeAction action) - { - IgniteArgumentCheck.NotNull(action, "action"); - - return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), - new ComputeActionJob(action), opId: OpAffinity, - writeAction: w => WriteAffinity(w, cacheName, affinityKey)); - } - - /// <summary> - /// Executes given job on the node where data for provided affinity key is located - /// (a.k.a. affinity co-location). - /// </summary> - /// <param name="cacheName">Name of the cache to use for affinity co-location.</param> - /// <param name="affinityKey">Affinity key.</param> - /// <param name="clo">Job to execute.</param> - /// <returns>Job result for this execution.</returns> - /// <typeparam name="TR">Type of job result.</typeparam> - public IFuture<TR> AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo) - { - IgniteArgumentCheck.NotNull(clo, "clo"); - - return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(), - new ComputeOutFuncJob(clo.ToNonGeneric()), opId: OpAffinity, - writeAction: w => WriteAffinity(w, cacheName, affinityKey)); - } - - /** <inheritDoc /> */ - protected override T Unmarshal<T>(IPortableStream stream) - { - bool keep = _keepPortable.Value; - - return Marshaller.Unmarshal<T>(stream, keep); - } - - /// <summary> - /// Internal routine for closure-based task execution. - /// </summary> - /// <param name="task">Task.</param> - /// <param name="job">Job.</param> - /// <param name="jobs">Jobs.</param> - /// <param name="broadcast">Broadcast flag.</param> - /// <returns>Future.</returns> - private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job, - ICollection<IComputeJob> jobs, bool broadcast) - { - return ExecuteClosures0(task, job, jobs, broadcast ? OpBroadcast : OpUnicast, - jobs == null ? 1 : jobs.Count); - } - - /// <summary> - /// Internal routine for closure-based task execution. - /// </summary> - /// <param name="task">Task.</param> - /// <param name="job">Job.</param> - /// <param name="jobs">Jobs.</param> - /// <param name="opId">Op code.</param> - /// <param name="jobsCount">Jobs count.</param> - /// <param name="writeAction">Custom write action.</param> - /// <returns>Future.</returns> - [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", - Justification = "User code can throw any exception")] - private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job = null, - IEnumerable<IComputeJob> jobs = null, int opId = OpUnicast, int jobsCount = 0, - Action<PortableWriterImpl> writeAction = null) - { - Debug.Assert(job != null || jobs != null); - - var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, default(TA)); - - var taskHandle = Marshaller.Ignite.HandleRegistry.Allocate(holder); - - var jobHandles = new List<long>(job != null ? 1 : jobsCount); - - try - { - Exception err = null; - - try - { - DoOutOp(opId, writer => - { - writer.WriteLong(taskHandle); - - if (job != null) - { - writer.WriteInt(1); - - jobHandles.Add(WriteJob(job, writer)); - } - else - { - writer.WriteInt(jobsCount); - - Debug.Assert(jobs != null, "jobs != null"); - - jobHandles.AddRange(jobs.Select(jobEntry => WriteJob(jobEntry, writer))); - } - - holder.JobHandles(jobHandles); - - if (writeAction != null) - writeAction(writer); - }); - } - catch (Exception e) - { - err = e; - } - - if (err != null) - { - // Manual job handles release because they were not assigned to the task yet. - foreach (var hnd in jobHandles) - Marshaller.Ignite.HandleRegistry.Release(hnd); - - holder.CompleteWithError(taskHandle, err); - } - } - catch (Exception e) - { - // This exception means that out-op failed. - holder.CompleteWithError(taskHandle, e); - } - - return holder.Future; - } - - /// <summary> - /// Writes the job. - /// </summary> - /// <param name="job">The job.</param> - /// <param name="writer">The writer.</param> - /// <returns>Handle to the job holder</returns> - private long WriteJob(IComputeJob job, PortableWriterImpl writer) - { - var jobHolder = new ComputeJobHolder((Ignite) _prj.Ignite, job); - - var jobHandle = Marshaller.Ignite.HandleRegistry.Allocate(jobHolder); - - writer.WriteLong(jobHandle); - writer.WriteObject(jobHolder); - - return jobHandle; - } - - /// <summary> - /// Write task to the writer. - /// </summary> - /// <param name="writer">Writer.</param> - /// <param name="taskName">Task name.</param> - /// <param name="taskArg">Task arg.</param> - /// <param name="nodes">Nodes.</param> - private void WriteTask(PortableWriterImpl writer, string taskName, object taskArg, - ICollection<IClusterNode> nodes) - { - writer.WriteString(taskName); - writer.WriteBoolean(_keepPortable.Value); - writer.Write(taskArg); - - WriteNodeIds(writer, nodes); - } - - /// <summary> - /// Write node IDs. - /// </summary> - /// <param name="writer">Writer.</param> - /// <param name="nodes">Nodes.</param> - private static void WriteNodeIds(PortableWriterImpl writer, ICollection<IClusterNode> nodes) - { - if (nodes == null) - writer.WriteBoolean(false); - else - { - writer.WriteBoolean(true); - writer.WriteInt(nodes.Count); - - foreach (IClusterNode node in nodes) - writer.WriteGuid(node.Id); - } - } - - /// <summary> - /// Writes the affinity info. - /// </summary> - /// <param name="writer">The writer.</param> - /// <param name="cacheName">Name of the cache to use for affinity co-location.</param> - /// <param name="affinityKey">Affinity key.</param> - private static void WriteAffinity(PortableWriterImpl writer, string cacheName, object affinityKey) - { - writer.WriteString(cacheName); - - writer.WriteObject(affinityKey); - } - - /// <summary> - /// Gets element count or zero. - /// </summary> - private static int GetCountOrZero(object collection) - { - var coll = collection as ICollection; - - return coll == null ? 0 : coll.Count; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs deleted file mode 100644 index f4ed999..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using System.Reflection; - using Apache.Ignite.Core.Compute; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Resource; - using Apache.Ignite.Core.Portable; - using Apache.Ignite.Core.Resource; - - /// <summary> - /// Non-generic version of IComputeJob{T}. - /// </summary> - internal interface IComputeJob : IComputeJob<object> - { - // No-op. - } - - /// <summary> - /// Wraps generic func into a non-generic for internal usage. - /// </summary> - internal class ComputeJobWrapper : IComputeJob, IPortableWriteAware - { - /** */ - private readonly Func<object, object> _execute; - - /** */ - private readonly Action<object> _cancel; - - /** */ - private readonly object _job; - - /// <summary> - /// Initializes a new instance of the <see cref="ComputeJobWrapper"/> class. - /// </summary> - /// <param name="reader">The reader.</param> - public ComputeJobWrapper(IPortableReader reader) - { - var reader0 = (PortableReaderImpl)reader.RawReader(); - - _job = PortableUtils.ReadPortableOrSerializable<object>(reader0); - - DelegateTypeDescriptor.GetComputeJob(_job.GetType(), out _execute, out _cancel); - } - - /// <summary> - /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class. - /// </summary> - public ComputeJobWrapper(object job, Func<object, object> execute, Action<object> cancel) - { - _job = job; - - _execute = execute; - - _cancel = cancel; - } - - /** <inheritDoc /> */ - public object Execute() - { - try - { - return _execute(_job); - } - catch (TargetInvocationException ex) - { - throw ex.InnerException; - } - } - - /** <inheritDoc /> */ - public void Cancel() - { - try - { - _cancel(_job); - } - catch (TargetInvocationException ex) - { - throw ex.InnerException; - } - } - - /** <inheritDoc /> */ - public void WritePortable(IPortableWriter writer) - { - var writer0 = (PortableWriterImpl)writer.RawWriter(); - - writer0.DetachNext(); - PortableUtils.WritePortableOrSerializable(writer0, Job); - } - - /// <summary> - /// Injects Ignite instance into wrapped object. - /// </summary> - [InstanceResource] - public void InjectIgnite(IIgnite ignite) - { - // Propagate injection - ResourceProcessor.Inject(Job, (IgniteProxy)ignite); - } - - /// <summary> - /// Gets the inner job. - /// </summary> - public object Job - { - get { return _job; } - } - } - - /// <summary> - /// Extension methods for IComputeJob{T}. - /// </summary> - internal static class ComputeJobExtensions - { - /// <summary> - /// Convert to non-generic wrapper. - /// </summary> - public static IComputeJob ToNonGeneric<T>(this IComputeJob<T> job) - { - return new ComputeJobWrapper(job, x => job.Execute(), x => job.Cancel()); - } - - /// <summary> - /// Unwraps job of one type into job of another type. - /// </summary> - public static IComputeJob<TR> Unwrap<T, TR>(this IComputeJob<T> job) - { - var wrapper = job as ComputeJobWrapper; - - return wrapper != null ? (IComputeJob<TR>) wrapper.Job : (IComputeJob<TR>) job; - } - - /// <summary> - /// Unwraps job of one type into job of another type. - /// </summary> - public static object Unwrap(this IComputeJob<object> job) - { - var wrapper = job as ComputeJobWrapper; - - return wrapper != null ? wrapper.Job : job; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs deleted file mode 100644 index a0de895..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using System.Diagnostics; - using System.Diagnostics.CodeAnalysis; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Impl.Cluster; - using Apache.Ignite.Core.Impl.Compute.Closure; - using Apache.Ignite.Core.Impl.Memory; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Portable.IO; - using Apache.Ignite.Core.Impl.Resource; - using Apache.Ignite.Core.Portable; - - /// <summary> - /// Holder for user-provided compute job. - /// </summary> - internal class ComputeJobHolder : IPortableWriteAware - { - /** Actual job. */ - private readonly IComputeJob _job; - - /** Owning grid. */ - private readonly Ignite _ignite; - - /** Result (set for local jobs only). */ - private volatile ComputeJobResultImpl _jobRes; - - /// <summary> - /// Default ctor for marshalling. - /// </summary> - /// <param name="reader"></param> - public ComputeJobHolder(IPortableReader reader) - { - Debug.Assert(reader != null); - - var reader0 = (PortableReaderImpl) reader.RawReader(); - - _ignite = reader0.Marshaller.Ignite; - - _job = PortableUtils.ReadPortableOrSerializable<IComputeJob>(reader0); - } - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="grid">Grid.</param> - /// <param name="job">Job.</param> - public ComputeJobHolder(Ignite grid, IComputeJob job) - { - Debug.Assert(grid != null); - Debug.Assert(job != null); - - _ignite = grid; - _job = job; - } - - /// <summary> - /// Executes local job. - /// </summary> - /// <param name="cancel">Cancel flag.</param> - public void ExecuteLocal(bool cancel) - { - object res; - bool success; - - Execute0(cancel, out res, out success); - - _jobRes = new ComputeJobResultImpl( - success ? res : null, - success ? null : res as Exception, - _job, - _ignite.GetLocalNode().Id, - cancel - ); - } - - /// <summary> - /// Execute job serializing result to the stream. - /// </summary> - /// <param name="cancel">Whether the job must be cancelled.</param> - /// <param name="stream">Stream.</param> - public void ExecuteRemote(PlatformMemoryStream stream, bool cancel) - { - // 1. Execute job. - object res; - bool success; - - Execute0(cancel, out res, out success); - - // 2. Try writing result to the stream. - ClusterGroupImpl prj = _ignite.ClusterGroup; - - PortableWriterImpl writer = prj.Marshaller.StartMarshal(stream); - - try - { - // 3. Marshal results. - PortableUtils.WriteWrappedInvocationResult(writer, success, res); - } - finally - { - // 4. Process metadata. - prj.FinishMarshal(writer); - } - } - - /// <summary> - /// Cancel the job. - /// </summary> - public void Cancel() - { - _job.Cancel(); - } - - /// <summary> - /// Serialize the job to the stream. - /// </summary> - /// <param name="stream">Stream.</param> - /// <returns>True if successfull.</returns> - [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", - Justification = "User job can throw any exception")] - internal bool Serialize(IPortableStream stream) - { - ClusterGroupImpl prj = _ignite.ClusterGroup; - - PortableWriterImpl writer = prj.Marshaller.StartMarshal(stream); - - try - { - writer.Write(this); - - return true; - } - catch (Exception e) - { - writer.WriteString("Failed to marshal job [job=" + _job + ", errType=" + e.GetType().Name + - ", errMsg=" + e.Message + ']'); - - return false; - } - finally - { - // 4. Process metadata. - prj.FinishMarshal(writer); - } - } - - /// <summary> - /// Job. - /// </summary> - internal IComputeJob Job - { - get { return _job; } - } - - /// <summary> - /// Job result. - /// </summary> - internal ComputeJobResultImpl JobResult - { - get { return _jobRes; } - } - - /// <summary> - /// Internal job execution routine. - /// </summary> - /// <param name="cancel">Cancel flag.</param> - /// <param name="res">Result.</param> - /// <param name="success">Success flag.</param> - [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", - Justification = "User job can throw any exception")] - private void Execute0(bool cancel, out object res, out bool success) - { - // 1. Inject resources. - IComputeResourceInjector injector = _job as IComputeResourceInjector; - - if (injector != null) - injector.Inject(_ignite); - else - ResourceProcessor.Inject(_job, _ignite); - - // 2. Execute. - try - { - if (cancel) - _job.Cancel(); - - res = _job.Execute(); - - success = true; - } - catch (Exception e) - { - res = e; - - success = false; - } - } - - /** <inheritDoc /> */ - public void WritePortable(IPortableWriter writer) - { - PortableWriterImpl writer0 = (PortableWriterImpl) writer.RawWriter(); - - writer0.DetachNext(); - PortableUtils.WritePortableOrSerializable(writer0, _job); - } - - /// <summary> - /// Create job instance. - /// </summary> - /// <param name="grid">Grid.</param> - /// <param name="stream">Stream.</param> - /// <returns></returns> - internal static ComputeJobHolder CreateJob(Ignite grid, IPortableStream stream) - { - try - { - return grid.Marshaller.StartUnmarshal(stream).ReadObject<ComputeJobHolder>(); - } - catch (Exception e) - { - throw new IgniteException("Failed to deserialize the job [errType=" + e.GetType().Name + - ", errMsg=" + e.Message + ']'); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs deleted file mode 100644 index 8173f71..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using Apache.Ignite.Core.Compute; - - /// <summary> - /// Wraps non-generic IComputeJobResult in generic form. - /// </summary> - internal class ComputeJobResultGenericWrapper<T> : IComputeJobResult<T> - { - /** */ - private readonly IComputeJobResult<object> _wrappedRes; - - /// <summary> - /// Initializes a new instance of the <see cref="ComputeJobResultGenericWrapper{T}"/> class. - /// </summary> - /// <param name="jobRes">The job result to wrap.</param> - public ComputeJobResultGenericWrapper(IComputeJobResult<object> jobRes) - { - _wrappedRes = jobRes; - } - - /** <inheritdoc /> */ - public T Data() - { - return (T)_wrappedRes.Data(); - } - - /** <inheritdoc /> */ - public Exception Exception() - { - return _wrappedRes.Exception(); - } - - /** <inheritdoc /> */ - public IComputeJob<T> Job() - { - return _wrappedRes.Job().Unwrap<object, T>(); - } - - /** <inheritdoc /> */ - public Guid NodeId - { - get { return _wrappedRes.NodeId; } - } - - /** <inheritdoc /> */ - public bool Cancelled - { - get { return _wrappedRes.Cancelled; } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs deleted file mode 100644 index a35bae0..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using Apache.Ignite.Core.Compute; - - /// <summary> - /// Job result implementation. - /// </summary> - internal class ComputeJobResultImpl : IComputeJobResult<object> - { - /** Data. */ - private readonly object _data; - - /** Exception. */ - private readonly Exception _err; - - /** Backing job. */ - private readonly IComputeJob _job; - - /** Node ID. */ - private readonly Guid _nodeId; - - /** Cancel flag. */ - private readonly bool _cancelled; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="data">Data.</param> - /// <param name="err">Exception.</param> - /// <param name="job">Backing job.</param> - /// <param name="nodeId">Node ID.</param> - /// <param name="cancelled">Cancel flag.</param> - public ComputeJobResultImpl(object data, Exception err, IComputeJob job, Guid nodeId, bool cancelled) - { - _data = data; - _err = err; - _job = job; - _nodeId = nodeId; - _cancelled = cancelled; - } - - /** <inheritDoc /> */ - public object Data() - { - return _data; - } - - /** <inheritDoc /> */ - public Exception Exception() - { - return _err; - } - - /** <inheritDoc /> */ - public IComputeJob<object> Job() - { - return _job; - } - - /** <inheritDoc /> */ - public Guid NodeId - { - get - { - return _nodeId; - } - } - - /** <inheritDoc /> */ - public bool Cancelled - { - get - { - return _cancelled; - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs deleted file mode 100644 index dda04b6..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using System.Diagnostics; - using System.Reflection; - using Apache.Ignite.Core.Compute; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Resource; - using Apache.Ignite.Core.Portable; - using Apache.Ignite.Core.Resource; - - /// <summary> - /// Non-generic version of IComputeFunc{T}. - /// </summary> - internal interface IComputeOutFunc : IComputeFunc<object> - { - // No-op. - } - - /// <summary> - /// Wraps generic func into a non-generic for internal usage. - /// </summary> - internal class ComputeOutFuncWrapper : IComputeOutFunc, IPortableWriteAware - { - /** */ - private readonly object _func; - - /** */ - private readonly Func<object, object> _invoker; - - /// <summary> - /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class. - /// </summary> - /// <param name="func">The function to wrap.</param> - /// <param name="invoker">The function invoker.</param> - public ComputeOutFuncWrapper(object func, Func<object> invoker) - { - Debug.Assert(func != null); - Debug.Assert(invoker != null); - - _func = func; - - _invoker = target => invoker(); - } - - /** <inheritDoc /> */ - public object Invoke() - { - try - { - return _invoker(_func); - } - catch (TargetInvocationException ex) - { - throw ex.InnerException; - } - } - - /** <inheritDoc /> */ - public void WritePortable(IPortableWriter writer) - { - var writer0 = (PortableWriterImpl)writer.RawWriter(); - - writer0.DetachNext(); - PortableUtils.WritePortableOrSerializable(writer0, _func); - } - - /// <summary> - /// Initializes a new instance of the <see cref="ComputeOutFuncWrapper"/> class. - /// </summary> - /// <param name="reader">The reader.</param> - public ComputeOutFuncWrapper(IPortableReader reader) - { - var reader0 = (PortableReaderImpl)reader.RawReader(); - - _func = PortableUtils.ReadPortableOrSerializable<object>(reader0); - - _invoker = DelegateTypeDescriptor.GetComputeOutFunc(_func.GetType()); - } - - /// <summary> - /// Injects the grid. - /// </summary> - [InstanceResource] - public void InjectIgnite(IIgnite ignite) - { - // Propagate injection - ResourceProcessor.Inject(_func, (IgniteProxy)ignite); - } - } - - /// <summary> - /// Extension methods for IComputeOutFunc{T}. - /// </summary> - internal static class ComputeOutFuncExtensions - { - /// <summary> - /// Convert to non-generic wrapper. - /// </summary> - public static IComputeOutFunc ToNonGeneric<T>(this IComputeFunc<T> func) - { - return new ComputeOutFuncWrapper(func, () => func.Invoke()); - } - } -}
