http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs new file mode 100644 index 0000000..789e1c4 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs @@ -0,0 +1,645 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Diagnostics; + using System.Diagnostics.CodeAnalysis; + using System.Linq; + using System.Runtime.Serialization; + using System.Threading; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Impl.Cluster; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Compute.Closure; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Unmanaged; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Compute implementation. + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] + internal class ComputeImpl : PlatformTarget + { + /** */ + private const int OpAffinity = 1; + + /** */ + private const int OpBroadcast = 2; + + /** */ + private const int OpExec = 3; + + /** */ + private const int OpExecAsync = 4; + + /** */ + private const int OpUnicast = 5; + + /** Underlying projection. */ + private readonly ClusterGroupImpl _prj; + + /** Whether objects must be kept portable. */ + private readonly ThreadLocal<bool> _keepPortable = new ThreadLocal<bool>(() => false); + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="prj">Projection.</param> + /// <param name="keepPortable">"keepPortable" flag.</param> + public ComputeImpl(IUnmanagedTarget target, PortableMarshaller marsh, ClusterGroupImpl prj, bool keepPortable) + : base(target, marsh) + { + _prj = prj; + + _keepPortable.Value = keepPortable; + } + + /// <summary> + /// Grid projection to which this compute instance belongs. + /// </summary> + public IClusterGroup ClusterGroup + { + get + { + return _prj; + } + } + + /// <summary> + /// Sets no-failover flag for the next executed task on this projection in the current thread. + /// If flag is set, job will be never failed over even if remote node crashes or rejects execution. + /// When task starts execution, the no-failover flag is reset, so all other task will use default + /// failover policy, unless this flag is set again. + /// </summary> + public void WithNoFailover() + { + UU.ComputeWithNoFailover(Target); + } + + /// <summary> + /// Sets task timeout for the next executed task on this projection in the current thread. + /// When task starts execution, the timeout is reset, so one timeout is used only once. + /// </summary> + /// <param name="timeout">Computation timeout in milliseconds.</param> + public void WithTimeout(long timeout) + { + UU.ComputeWithTimeout(Target, timeout); + } + + /// <summary> + /// Sets keep-portable flag for the next executed Java task on this projection in the current + /// thread so that task argument passed to Java and returned task results will not be + /// deserialized. + /// </summary> + public void WithKeepPortable() + { + _keepPortable.Value = true; + } + + /// <summary> + /// Executes given Java task on the grid projection. If task for given name has not been deployed yet, + /// then 'taskName' will be used as task class name to auto-deploy the task. + /// </summary> + public T ExecuteJavaTask<T>(string taskName, object taskArg) + { + IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName"); + + ICollection<IClusterNode> nodes = _prj.Predicate == null ? null : _prj.Nodes(); + + try + { + T res = DoOutInOp<T>(OpExec, writer => + { + WriteTask(writer, taskName, taskArg, nodes); + }); + + return res; + } + finally + { + _keepPortable.Value = false; + } + } + + /// <summary> + /// Executes given Java task asynchronously on the grid projection. + /// If task for given name has not been deployed yet, + /// then 'taskName' will be used as task class name to auto-deploy the task. + /// </summary> + public IFuture<T> ExecuteJavaTaskAsync<T>(string taskName, object taskArg) + { + IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName"); + + ICollection<IClusterNode> nodes = _prj.Predicate == null ? null : _prj.Nodes(); + + try + { + IFuture<T> fut = null; + + DoOutInOp(OpExecAsync, writer => + { + WriteTask(writer, taskName, taskArg, nodes); + }, input => + { + fut = GetFuture<T>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp), _keepPortable.Value); + }); + + return fut; + } + finally + { + _keepPortable.Value = false; + } + } + + /// <summary> + /// Executes given task on the grid projection. For step-by-step explanation of task execution process + /// refer to <see cref="IComputeTask{A,T,R}"/> documentation. + /// </summary> + /// <param name="task">Task to execute.</param> + /// <param name="taskArg">Optional task argument.</param> + /// <returns>Task result.</returns> + public IFuture<TR> Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg) + { + IgniteArgumentCheck.NotNull(task, "task"); + + var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, taskArg); + + long ptr = Marshaller.Ignite.HandleRegistry.Allocate(holder); + + UU.ComputeExecuteNative(Target, ptr, _prj.TopologyVersion); + + return holder.Future; + } + + /// <summary> + /// Executes given task on the grid projection. For step-by-step explanation of task execution process + /// refer to <see cref="IComputeTask{A,T,R}"/> documentation. + /// </summary> + /// <param name="taskType">Task type.</param> + /// <param name="taskArg">Optional task argument.</param> + /// <returns>Task result.</returns> + public IFuture<TR> Execute<TA, T, TR>(Type taskType, TA taskArg) + { + IgniteArgumentCheck.NotNull(taskType, "taskType"); + + object task = FormatterServices.GetUninitializedObject(taskType); + + var task0 = task as IComputeTask<TA, T, TR>; + + if (task0 == null) + throw new IgniteException("Task type doesn't implement IComputeTask: " + taskType.Name); + + return Execute(task0, taskArg); + } + + /// <summary> + /// Executes provided job on a node in this grid projection. The result of the + /// job execution is returned from the result closure. + /// </summary> + /// <param name="clo">Job to execute.</param> + /// <returns>Job result for this execution.</returns> + public IFuture<TR> Execute<TR>(IComputeFunc<TR> clo) + { + IgniteArgumentCheck.NotNull(clo, "clo"); + + return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(), + new ComputeOutFuncJob(clo.ToNonGeneric()), null, false); + } + + /// <summary> + /// Executes provided delegate on a node in this grid projection. The result of the + /// job execution is returned from the result closure. + /// </summary> + /// <param name="func">Func to execute.</param> + /// <returns>Job result for this execution.</returns> + public IFuture<TR> Execute<TR>(Func<TR> func) + { + IgniteArgumentCheck.NotNull(func, "func"); + + var wrappedFunc = new ComputeOutFuncWrapper(func, () => func()); + + return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(), + new ComputeOutFuncJob(wrappedFunc), null, false); + } + + /// <summary> + /// Executes collection of jobs on nodes within this grid projection. + /// </summary> + /// <param name="clos">Collection of jobs to execute.</param> + /// <returns>Collection of job results for this execution.</returns> + public IFuture<ICollection<TR>> Execute<TR>(IEnumerable<IComputeFunc<TR>> clos) + { + IgniteArgumentCheck.NotNull(clos, "clos"); + + ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(clos)); + + foreach (IComputeFunc<TR> clo in clos) + jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric())); + + return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(jobs.Count), + null, jobs, false); + } + + /// <summary> + /// Executes collection of jobs on nodes within this grid projection. + /// </summary> + /// <param name="clos">Collection of jobs to execute.</param> + /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param> + /// <returns>Collection of job results for this execution.</returns> + public IFuture<TR2> Execute<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc) + { + IgniteArgumentCheck.NotNull(clos, "clos"); + + ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(clos)); + + foreach (var clo in clos) + jobs.Add(new ComputeOutFuncJob(clo.ToNonGeneric())); + + return ExecuteClosures0(new ComputeReducingClosureTask<object, TR1, TR2>(rdc), null, jobs, false); + } + + /// <summary> + /// Broadcasts given job to all nodes in grid projection. Every participating node will return a job result. + /// </summary> + /// <param name="clo">Job to broadcast to all projection nodes.</param> + /// <returns>Collection of results for this execution.</returns> + public IFuture<ICollection<TR>> Broadcast<TR>(IComputeFunc<TR> clo) + { + IgniteArgumentCheck.NotNull(clo, "clo"); + + return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1), + new ComputeOutFuncJob(clo.ToNonGeneric()), null, true); + } + + /// <summary> + /// Broadcasts given closure job with passed in argument to all nodes in grid projection. + /// Every participating node will return a job result. + /// </summary> + /// <param name="clo">Job to broadcast to all projection nodes.</param> + /// <param name="arg">Job closure argument.</param> + /// <returns>Collection of results for this execution.</returns> + public IFuture<ICollection<TR>> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg) + { + IgniteArgumentCheck.NotNull(clo, "clo"); + + return ExecuteClosures0(new ComputeMultiClosureTask<object, TR, ICollection<TR>>(1), + new ComputeFuncJob(clo.ToNonGeneric(), arg), null, true); + } + + /// <summary> + /// Broadcasts given job to all nodes in grid projection. + /// </summary> + /// <param name="action">Job to broadcast to all projection nodes.</param> + public IFuture<object> Broadcast(IComputeAction action) + { + IgniteArgumentCheck.NotNull(action, "action"); + + return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), + new ComputeActionJob(action), opId: OpBroadcast); + } + + /// <summary> + /// Executes provided job on a node in this grid projection. + /// </summary> + /// <param name="action">Job to execute.</param> + public IFuture<object> Run(IComputeAction action) + { + IgniteArgumentCheck.NotNull(action, "action"); + + return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), + new ComputeActionJob(action)); + } + + /// <summary> + /// Executes collection of jobs on Ignite nodes within this grid projection. + /// </summary> + /// <param name="actions">Jobs to execute.</param> + public IFuture<object> Run(IEnumerable<IComputeAction> actions) + { + IgniteArgumentCheck.NotNull(actions, "actions"); + + var actions0 = actions as ICollection; + + if (actions0 == null) + { + var jobs = actions.Select(a => new ComputeActionJob(a)).ToList(); + + return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), jobs: jobs, + jobsCount: jobs.Count); + } + else + { + var jobs = actions.Select(a => new ComputeActionJob(a)); + + return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), jobs: jobs, + jobsCount: actions0.Count); + } + } + + /// <summary> + /// Executes provided closure job on a node in this grid projection. + /// </summary> + /// <param name="clo">Job to run.</param> + /// <param name="arg">Job argument.</param> + /// <returns>Job result for this execution.</returns> + public IFuture<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, T arg) + { + IgniteArgumentCheck.NotNull(clo, "clo"); + + return ExecuteClosures0(new ComputeSingleClosureTask<T, TR, TR>(), + new ComputeFuncJob(clo.ToNonGeneric(), arg), null, false); + } + + /// <summary> + /// Executes provided closure job on nodes within this grid projection. A new job is executed for + /// every argument in the passed in collection. The number of actual job executions will be + /// equal to size of the job arguments collection. + /// </summary> + /// <param name="clo">Job to run.</param> + /// <param name="args">Job arguments.</param> + /// <returns>Collection of job results.</returns> + public IFuture<ICollection<TR>> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args) + { + IgniteArgumentCheck.NotNull(clo, "clo"); + + IgniteArgumentCheck.NotNull(clo, "clo"); + + var jobs = new List<IComputeJob>(GetCountOrZero(args)); + + var func = clo.ToNonGeneric(); + + foreach (T arg in args) + jobs.Add(new ComputeFuncJob(func, arg)); + + return ExecuteClosures0(new ComputeMultiClosureTask<T, TR, ICollection<TR>>(jobs.Count), + null, jobs, false); + } + + /// <summary> + /// Executes provided closure job on nodes within this grid projection. A new job is executed for + /// every argument in the passed in collection. The number of actual job executions will be + /// equal to size of the job arguments collection. The returned job results will be reduced + /// into an individual result by provided reducer. + /// </summary> + /// <param name="clo">Job to run.</param> + /// <param name="args">Job arguments.</param> + /// <param name="rdc">Reducer to reduce all job results into one individual return value.</param> + /// <returns>Reduced job result for this execution.</returns> + public IFuture<TR2> Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, + IComputeReducer<TR1, TR2> rdc) + { + IgniteArgumentCheck.NotNull(clo, "clo"); + + IgniteArgumentCheck.NotNull(clo, "clo"); + + IgniteArgumentCheck.NotNull(clo, "clo"); + + ICollection<IComputeJob> jobs = new List<IComputeJob>(GetCountOrZero(args)); + + var func = clo.ToNonGeneric(); + + foreach (T arg in args) + jobs.Add(new ComputeFuncJob(func, arg)); + + return ExecuteClosures0(new ComputeReducingClosureTask<T, TR1, TR2>(rdc), + null, jobs, false); + } + + /// <summary> + /// Executes given job on the node where data for provided affinity key is located + /// (a.k.a. affinity co-location). + /// </summary> + /// <param name="cacheName">Name of the cache to use for affinity co-location.</param> + /// <param name="affinityKey">Affinity key.</param> + /// <param name="action">Job to execute.</param> + public IFuture AffinityRun(string cacheName, object affinityKey, IComputeAction action) + { + IgniteArgumentCheck.NotNull(action, "action"); + + return ExecuteClosures0(new ComputeSingleClosureTask<object, object, object>(), + new ComputeActionJob(action), opId: OpAffinity, + writeAction: w => WriteAffinity(w, cacheName, affinityKey)); + } + + /// <summary> + /// Executes given job on the node where data for provided affinity key is located + /// (a.k.a. affinity co-location). + /// </summary> + /// <param name="cacheName">Name of the cache to use for affinity co-location.</param> + /// <param name="affinityKey">Affinity key.</param> + /// <param name="clo">Job to execute.</param> + /// <returns>Job result for this execution.</returns> + /// <typeparam name="TR">Type of job result.</typeparam> + public IFuture<TR> AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo) + { + IgniteArgumentCheck.NotNull(clo, "clo"); + + return ExecuteClosures0(new ComputeSingleClosureTask<object, TR, TR>(), + new ComputeOutFuncJob(clo.ToNonGeneric()), opId: OpAffinity, + writeAction: w => WriteAffinity(w, cacheName, affinityKey)); + } + + /** <inheritDoc /> */ + protected override T Unmarshal<T>(IPortableStream stream) + { + bool keep = _keepPortable.Value; + + return Marshaller.Unmarshal<T>(stream, keep); + } + + /// <summary> + /// Internal routine for closure-based task execution. + /// </summary> + /// <param name="task">Task.</param> + /// <param name="job">Job.</param> + /// <param name="jobs">Jobs.</param> + /// <param name="broadcast">Broadcast flag.</param> + /// <returns>Future.</returns> + private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job, + ICollection<IComputeJob> jobs, bool broadcast) + { + return ExecuteClosures0(task, job, jobs, broadcast ? OpBroadcast : OpUnicast, + jobs == null ? 1 : jobs.Count); + } + + /// <summary> + /// Internal routine for closure-based task execution. + /// </summary> + /// <param name="task">Task.</param> + /// <param name="job">Job.</param> + /// <param name="jobs">Jobs.</param> + /// <param name="opId">Op code.</param> + /// <param name="jobsCount">Jobs count.</param> + /// <param name="writeAction">Custom write action.</param> + /// <returns>Future.</returns> + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", + Justification = "User code can throw any exception")] + private IFuture<TR> ExecuteClosures0<TA, T, TR>(IComputeTask<TA, T, TR> task, IComputeJob job = null, + IEnumerable<IComputeJob> jobs = null, int opId = OpUnicast, int jobsCount = 0, + Action<PortableWriterImpl> writeAction = null) + { + Debug.Assert(job != null || jobs != null); + + var holder = new ComputeTaskHolder<TA, T, TR>((Ignite) _prj.Ignite, this, task, default(TA)); + + var taskHandle = Marshaller.Ignite.HandleRegistry.Allocate(holder); + + var jobHandles = new List<long>(job != null ? 1 : jobsCount); + + try + { + Exception err = null; + + try + { + DoOutOp(opId, writer => + { + writer.WriteLong(taskHandle); + + if (job != null) + { + writer.WriteInt(1); + + jobHandles.Add(WriteJob(job, writer)); + } + else + { + writer.WriteInt(jobsCount); + + Debug.Assert(jobs != null, "jobs != null"); + + jobHandles.AddRange(jobs.Select(jobEntry => WriteJob(jobEntry, writer))); + } + + holder.JobHandles(jobHandles); + + if (writeAction != null) + writeAction(writer); + }); + } + catch (Exception e) + { + err = e; + } + + if (err != null) + { + // Manual job handles release because they were not assigned to the task yet. + foreach (var hnd in jobHandles) + Marshaller.Ignite.HandleRegistry.Release(hnd); + + holder.CompleteWithError(taskHandle, err); + } + } + catch (Exception e) + { + // This exception means that out-op failed. + holder.CompleteWithError(taskHandle, e); + } + + return holder.Future; + } + + /// <summary> + /// Writes the job. + /// </summary> + /// <param name="job">The job.</param> + /// <param name="writer">The writer.</param> + /// <returns>Handle to the job holder</returns> + private long WriteJob(IComputeJob job, PortableWriterImpl writer) + { + var jobHolder = new ComputeJobHolder(_prj.Ignite as Ignite, job); + + var jobHandle = Marshaller.Ignite.HandleRegistry.Allocate(jobHolder); + + writer.WriteLong(jobHandle); + writer.WriteObject(jobHolder); + + return jobHandle; + } + + /// <summary> + /// Write task to the writer. + /// </summary> + /// <param name="writer">Writer.</param> + /// <param name="taskName">Task name.</param> + /// <param name="taskArg">Task arg.</param> + /// <param name="nodes">Nodes.</param> + private void WriteTask(PortableWriterImpl writer, string taskName, object taskArg, + ICollection<IClusterNode> nodes) + { + writer.WriteString(taskName); + writer.WriteBoolean(_keepPortable.Value); + writer.Write(taskArg); + + WriteNodeIds(writer, nodes); + } + + /// <summary> + /// Write node IDs. + /// </summary> + /// <param name="writer">Writer.</param> + /// <param name="nodes">Nodes.</param> + private static void WriteNodeIds(PortableWriterImpl writer, ICollection<IClusterNode> nodes) + { + if (nodes == null) + writer.WriteBoolean(false); + else + { + writer.WriteBoolean(true); + writer.WriteInt(nodes.Count); + + foreach (IClusterNode node in nodes) + writer.WriteGuid(node.Id); + } + } + + /// <summary> + /// Writes the affinity info. + /// </summary> + /// <param name="writer">The writer.</param> + /// <param name="cacheName">Name of the cache to use for affinity co-location.</param> + /// <param name="affinityKey">Affinity key.</param> + private static void WriteAffinity(PortableWriterImpl writer, string cacheName, object affinityKey) + { + writer.WriteString(cacheName); + + writer.WriteObject(affinityKey); + } + + /// <summary> + /// Gets element count or zero. + /// </summary> + private static int GetCountOrZero(object collection) + { + var coll = collection as ICollection; + + return coll == null ? 0 : coll.Count; + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs new file mode 100644 index 0000000..f4ed999 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJob.cs @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute +{ + using System; + using System.Reflection; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Portable; + using Apache.Ignite.Core.Resource; + + /// <summary> + /// Non-generic version of IComputeJob{T}. + /// </summary> + internal interface IComputeJob : IComputeJob<object> + { + // No-op. + } + + /// <summary> + /// Wraps generic func into a non-generic for internal usage. + /// </summary> + internal class ComputeJobWrapper : IComputeJob, IPortableWriteAware + { + /** */ + private readonly Func<object, object> _execute; + + /** */ + private readonly Action<object> _cancel; + + /** */ + private readonly object _job; + + /// <summary> + /// Initializes a new instance of the <see cref="ComputeJobWrapper"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public ComputeJobWrapper(IPortableReader reader) + { + var reader0 = (PortableReaderImpl)reader.RawReader(); + + _job = PortableUtils.ReadPortableOrSerializable<object>(reader0); + + DelegateTypeDescriptor.GetComputeJob(_job.GetType(), out _execute, out _cancel); + } + + /// <summary> + /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class. + /// </summary> + public ComputeJobWrapper(object job, Func<object, object> execute, Action<object> cancel) + { + _job = job; + + _execute = execute; + + _cancel = cancel; + } + + /** <inheritDoc /> */ + public object Execute() + { + try + { + return _execute(_job); + } + catch (TargetInvocationException ex) + { + throw ex.InnerException; + } + } + + /** <inheritDoc /> */ + public void Cancel() + { + try + { + _cancel(_job); + } + catch (TargetInvocationException ex) + { + throw ex.InnerException; + } + } + + /** <inheritDoc /> */ + public void WritePortable(IPortableWriter writer) + { + var writer0 = (PortableWriterImpl)writer.RawWriter(); + + writer0.DetachNext(); + PortableUtils.WritePortableOrSerializable(writer0, Job); + } + + /// <summary> + /// Injects Ignite instance into wrapped object. + /// </summary> + [InstanceResource] + public void InjectIgnite(IIgnite ignite) + { + // Propagate injection + ResourceProcessor.Inject(Job, (IgniteProxy)ignite); + } + + /// <summary> + /// Gets the inner job. + /// </summary> + public object Job + { + get { return _job; } + } + } + + /// <summary> + /// Extension methods for IComputeJob{T}. + /// </summary> + internal static class ComputeJobExtensions + { + /// <summary> + /// Convert to non-generic wrapper. + /// </summary> + public static IComputeJob ToNonGeneric<T>(this IComputeJob<T> job) + { + return new ComputeJobWrapper(job, x => job.Execute(), x => job.Cancel()); + } + + /// <summary> + /// Unwraps job of one type into job of another type. + /// </summary> + public static IComputeJob<TR> Unwrap<T, TR>(this IComputeJob<T> job) + { + var wrapper = job as ComputeJobWrapper; + + return wrapper != null ? (IComputeJob<TR>) wrapper.Job : (IComputeJob<TR>) job; + } + + /// <summary> + /// Unwraps job of one type into job of another type. + /// </summary> + public static object Unwrap(this IComputeJob<object> job) + { + var wrapper = job as ComputeJobWrapper; + + return wrapper != null ? wrapper.Job : job; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs new file mode 100644 index 0000000..9bdb5cf --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobHolder.cs @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute +{ + using System; + using System.Diagnostics.CodeAnalysis; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Cluster; + using Apache.Ignite.Core.Impl.Compute.Closure; + using Apache.Ignite.Core.Impl.Memory; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Holder for user-provided compute job. + /// </summary> + internal class ComputeJobHolder : IPortableWriteAware + { + /** Actual job. */ + private readonly IComputeJob _job; + + /** Owning grid. */ + private readonly Ignite _ignite; + + /** Result (set for local jobs only). */ + private volatile ComputeJobResultImpl _jobRes; + + /// <summary> + /// Default ctor for marshalling. + /// </summary> + /// <param name="reader"></param> + public ComputeJobHolder(IPortableReader reader) + { + var reader0 = (PortableReaderImpl) reader.RawReader(); + + _ignite = reader0.Marshaller.Ignite; + + _job = PortableUtils.ReadPortableOrSerializable<IComputeJob>(reader0); + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="grid">Grid.</param> + /// <param name="job">Job.</param> + public ComputeJobHolder(Ignite grid, IComputeJob job) + { + _ignite = grid; + _job = job; + } + + /// <summary> + /// Executes local job. + /// </summary> + /// <param name="cancel">Cancel flag.</param> + public void ExecuteLocal(bool cancel) + { + object res; + bool success; + + Execute0(cancel, out res, out success); + + _jobRes = new ComputeJobResultImpl( + success ? res : null, + success ? null : res as Exception, + _job, + _ignite.LocalNode.Id, + cancel + ); + } + + /// <summary> + /// Execute job serializing result to the stream. + /// </summary> + /// <param name="cancel">Whether the job must be cancelled.</param> + /// <param name="stream">Stream.</param> + public void ExecuteRemote(PlatformMemoryStream stream, bool cancel) + { + // 1. Execute job. + object res; + bool success; + + Execute0(cancel, out res, out success); + + // 2. Try writing result to the stream. + ClusterGroupImpl prj = _ignite.ClusterGroup; + + PortableWriterImpl writer = prj.Marshaller.StartMarshal(stream); + + try + { + // 3. Marshal results. + PortableUtils.WriteWrappedInvocationResult(writer, success, res); + } + finally + { + // 4. Process metadata. + prj.FinishMarshal(writer); + } + } + + /// <summary> + /// Cancel the job. + /// </summary> + public void Cancel() + { + _job.Cancel(); + } + + /// <summary> + /// Serialize the job to the stream. + /// </summary> + /// <param name="stream">Stream.</param> + /// <returns>True if successfull.</returns> + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", + Justification = "User job can throw any exception")] + internal bool Serialize(IPortableStream stream) + { + ClusterGroupImpl prj = _ignite.ClusterGroup; + + PortableWriterImpl writer = prj.Marshaller.StartMarshal(stream); + + try + { + writer.Write(this); + + return true; + } + catch (Exception e) + { + writer.WriteString("Failed to marshal job [job=" + _job + ", errType=" + e.GetType().Name + + ", errMsg=" + e.Message + ']'); + + return false; + } + finally + { + // 4. Process metadata. + prj.FinishMarshal(writer); + } + } + + /// <summary> + /// Job. + /// </summary> + internal IComputeJob Job + { + get { return _job; } + } + + /// <summary> + /// Job result. + /// </summary> + internal ComputeJobResultImpl JobResult + { + get { return _jobRes; } + } + + /// <summary> + /// Internal job execution routine. + /// </summary> + /// <param name="cancel">Cancel flag.</param> + /// <param name="res">Result.</param> + /// <param name="success">Success flag.</param> + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", + Justification = "User job can throw any exception")] + private void Execute0(bool cancel, out object res, out bool success) + { + // 1. Inject resources. + IComputeResourceInjector injector = _job as IComputeResourceInjector; + + if (injector != null) + injector.Inject(_ignite); + else + ResourceProcessor.Inject(_job, _ignite); + + // 2. Execute. + try + { + if (cancel) + _job.Cancel(); + + res = _job.Execute(); + + success = true; + } + catch (Exception e) + { + res = e; + + success = false; + } + } + + /** <inheritDoc /> */ + public void WritePortable(IPortableWriter writer) + { + PortableWriterImpl writer0 = (PortableWriterImpl) writer.RawWriter(); + + writer0.DetachNext(); + PortableUtils.WritePortableOrSerializable(writer0, _job); + } + + /// <summary> + /// Create job instance. + /// </summary> + /// <param name="grid">Grid.</param> + /// <param name="stream">Stream.</param> + /// <returns></returns> + internal static ComputeJobHolder CreateJob(Ignite grid, IPortableStream stream) + { + try + { + return grid.Marshaller.StartUnmarshal(stream).ReadObject<ComputeJobHolder>(); + } + catch (Exception e) + { + throw new IgniteException("Failed to deserialize the job [errType=" + e.GetType().Name + + ", errMsg=" + e.Message + ']'); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs new file mode 100644 index 0000000..8173f71 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultGenericWrapper.cs @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute +{ + using System; + using Apache.Ignite.Core.Compute; + + /// <summary> + /// Wraps non-generic IComputeJobResult in generic form. + /// </summary> + internal class ComputeJobResultGenericWrapper<T> : IComputeJobResult<T> + { + /** */ + private readonly IComputeJobResult<object> _wrappedRes; + + /// <summary> + /// Initializes a new instance of the <see cref="ComputeJobResultGenericWrapper{T}"/> class. + /// </summary> + /// <param name="jobRes">The job result to wrap.</param> + public ComputeJobResultGenericWrapper(IComputeJobResult<object> jobRes) + { + _wrappedRes = jobRes; + } + + /** <inheritdoc /> */ + public T Data() + { + return (T)_wrappedRes.Data(); + } + + /** <inheritdoc /> */ + public Exception Exception() + { + return _wrappedRes.Exception(); + } + + /** <inheritdoc /> */ + public IComputeJob<T> Job() + { + return _wrappedRes.Job().Unwrap<object, T>(); + } + + /** <inheritdoc /> */ + public Guid NodeId + { + get { return _wrappedRes.NodeId; } + } + + /** <inheritdoc /> */ + public bool Cancelled + { + get { return _wrappedRes.Cancelled; } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs new file mode 100644 index 0000000..a35bae0 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeJobResultImpl.cs @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute +{ + using System; + using Apache.Ignite.Core.Compute; + + /// <summary> + /// Job result implementation. + /// </summary> + internal class ComputeJobResultImpl : IComputeJobResult<object> + { + /** Data. */ + private readonly object _data; + + /** Exception. */ + private readonly Exception _err; + + /** Backing job. */ + private readonly IComputeJob _job; + + /** Node ID. */ + private readonly Guid _nodeId; + + /** Cancel flag. */ + private readonly bool _cancelled; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="data">Data.</param> + /// <param name="err">Exception.</param> + /// <param name="job">Backing job.</param> + /// <param name="nodeId">Node ID.</param> + /// <param name="cancelled">Cancel flag.</param> + public ComputeJobResultImpl(object data, Exception err, IComputeJob job, Guid nodeId, bool cancelled) + { + _data = data; + _err = err; + _job = job; + _nodeId = nodeId; + _cancelled = cancelled; + } + + /** <inheritDoc /> */ + public object Data() + { + return _data; + } + + /** <inheritDoc /> */ + public Exception Exception() + { + return _err; + } + + /** <inheritDoc /> */ + public IComputeJob<object> Job() + { + return _job; + } + + /** <inheritDoc /> */ + public Guid NodeId + { + get + { + return _nodeId; + } + } + + /** <inheritDoc /> */ + public bool Cancelled + { + get + { + return _cancelled; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs new file mode 100644 index 0000000..dda04b6 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeOutFunc.cs @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute +{ + using System; + using System.Diagnostics; + using System.Reflection; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Portable; + using Apache.Ignite.Core.Resource; + + /// <summary> + /// Non-generic version of IComputeFunc{T}. + /// </summary> + internal interface IComputeOutFunc : IComputeFunc<object> + { + // No-op. + } + + /// <summary> + /// Wraps generic func into a non-generic for internal usage. + /// </summary> + internal class ComputeOutFuncWrapper : IComputeOutFunc, IPortableWriteAware + { + /** */ + private readonly object _func; + + /** */ + private readonly Func<object, object> _invoker; + + /// <summary> + /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class. + /// </summary> + /// <param name="func">The function to wrap.</param> + /// <param name="invoker">The function invoker.</param> + public ComputeOutFuncWrapper(object func, Func<object> invoker) + { + Debug.Assert(func != null); + Debug.Assert(invoker != null); + + _func = func; + + _invoker = target => invoker(); + } + + /** <inheritDoc /> */ + public object Invoke() + { + try + { + return _invoker(_func); + } + catch (TargetInvocationException ex) + { + throw ex.InnerException; + } + } + + /** <inheritDoc /> */ + public void WritePortable(IPortableWriter writer) + { + var writer0 = (PortableWriterImpl)writer.RawWriter(); + + writer0.DetachNext(); + PortableUtils.WritePortableOrSerializable(writer0, _func); + } + + /// <summary> + /// Initializes a new instance of the <see cref="ComputeOutFuncWrapper"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public ComputeOutFuncWrapper(IPortableReader reader) + { + var reader0 = (PortableReaderImpl)reader.RawReader(); + + _func = PortableUtils.ReadPortableOrSerializable<object>(reader0); + + _invoker = DelegateTypeDescriptor.GetComputeOutFunc(_func.GetType()); + } + + /// <summary> + /// Injects the grid. + /// </summary> + [InstanceResource] + public void InjectIgnite(IIgnite ignite) + { + // Propagate injection + ResourceProcessor.Inject(_func, (IgniteProxy)ignite); + } + } + + /// <summary> + /// Extension methods for IComputeOutFunc{T}. + /// </summary> + internal static class ComputeOutFuncExtensions + { + /// <summary> + /// Convert to non-generic wrapper. + /// </summary> + public static IComputeOutFunc ToNonGeneric<T>(this IComputeFunc<T> func) + { + return new ComputeOutFuncWrapper(func, () => func.Invoke()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs new file mode 100644 index 0000000..dfe0d18 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Compute +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Diagnostics; + using System.Diagnostics.CodeAnalysis; + using System.Linq; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Impl.Cluster; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Compute.Closure; + using Apache.Ignite.Core.Impl.Memory; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Resource; + + /// <summary> + /// Compute task holder interface used to avoid generics. + /// </summary> + internal interface IComputeTaskHolder + { + /// <summary> + /// Perform map step. + /// </summary> + /// <param name="inStream">Stream with IN data (topology info).</param> + /// <param name="outStream">Stream for OUT data (map result).</param> + /// <returns>Map with produced jobs.</returns> + void Map(PlatformMemoryStream inStream, PlatformMemoryStream outStream); + + /// <summary> + /// Process local job result. + /// </summary> + /// <param name="jobId">Job pointer.</param> + /// <returns>Policy.</returns> + int JobResultLocal(ComputeJobHolder jobId); + + /// <summary> + /// Process remote job result. + /// </summary> + /// <param name="jobId">Job pointer.</param> + /// <param name="stream">Stream.</param> + /// <returns>Policy.</returns> + int JobResultRemote(ComputeJobHolder jobId, PlatformMemoryStream stream); + + /// <summary> + /// Perform task reduce. + /// </summary> + void Reduce(); + + /// <summary> + /// Complete task. + /// </summary> + /// <param name="taskHandle">Task handle.</param> + void Complete(long taskHandle); + + /// <summary> + /// Complete task with error. + /// </summary> + /// <param name="taskHandle">Task handle.</param> + /// <param name="stream">Stream with serialized exception.</param> + void CompleteWithError(long taskHandle, PlatformMemoryStream stream); + } + + /// <summary> + /// Compute task holder. + /// </summary> + internal class ComputeTaskHolder<TA, T, TR> : IComputeTaskHolder + { + /** Empty results. */ + private static readonly IList<IComputeJobResult<T>> EmptyRes = + new ReadOnlyCollection<IComputeJobResult<T>>(new List<IComputeJobResult<T>>()); + + /** Compute instance. */ + private readonly ComputeImpl _compute; + + /** Actual task. */ + private readonly IComputeTask<TA, T, TR> _task; + + /** Task argument. */ + private readonly TA _arg; + + /** Results cache flag. */ + private readonly bool _resCache; + + /** Task future. */ + private readonly Future<TR> _fut = new Future<TR>(); + + /** Jobs whose results are cached. */ + private ISet<object> _resJobs; + + /** Cached results. */ + private IList<IComputeJobResult<T>> _ress; + + /** Handles for jobs which are not serialized right away. */ + private volatile List<long> _jobHandles; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="grid">Grid.</param> + /// <param name="compute">Compute.</param> + /// <param name="task">Task.</param> + /// <param name="arg">Argument.</param> + public ComputeTaskHolder(Ignite grid, ComputeImpl compute, IComputeTask<TA, T, TR> task, TA arg) + { + _compute = compute; + _arg = arg; + _task = task; + + ResourceTypeDescriptor resDesc = ResourceProcessor.Descriptor(task.GetType()); + + IComputeResourceInjector injector = task as IComputeResourceInjector; + + if (injector != null) + injector.Inject(grid); + else + resDesc.InjectIgnite(task, grid); + + _resCache = !resDesc.TaskNoResultCache; + } + + /** <inheritDoc /> */ + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", + Justification = "User code can throw any exception")] + public void Map(PlatformMemoryStream inStream, PlatformMemoryStream outStream) + { + IList<IClusterNode> subgrid; + + ClusterGroupImpl prj = (ClusterGroupImpl)_compute.ClusterGroup; + + var ignite = (Ignite) prj.Ignite; + + // 1. Unmarshal topology info if topology changed. + var reader = prj.Marshaller.StartUnmarshal(inStream); + + if (reader.ReadBoolean()) + { + long topVer = reader.ReadLong(); + + List<IClusterNode> nodes = new List<IClusterNode>(reader.ReadInt()); + + int nodesCnt = reader.ReadInt(); + + subgrid = new List<IClusterNode>(nodesCnt); + + for (int i = 0; i < nodesCnt; i++) + { + IClusterNode node = ignite.GetNode(reader.ReadGuid()); + + nodes.Add(node); + + if (reader.ReadBoolean()) + subgrid.Add(node); + } + + // Update parent projection to help other task callers avoid this overhead. + // Note that there is a chance that topology changed even further and this update fails. + // It means that some of subgrid nodes could have left the Grid. This is not critical + // for us, because Java will handle it gracefully. + prj.UpdateTopology(topVer, nodes); + } + else + { + IList<IClusterNode> nodes = prj.NodesNoRefresh(); + + Debug.Assert(nodes != null, "At least one topology update should have occurred."); + + subgrid = IgniteUtils.Shuffle(nodes); + } + + // 2. Perform map. + IDictionary<IComputeJob<T>, IClusterNode> map; + Exception err; + + try + { + map = _task.Map(subgrid, _arg); + + err = null; + } + catch (Exception e) + { + map = null; + + err = e; + + // Java can receive another exception in case of marshalling failure but it is not important. + Finish(default(TR), e); + } + + // 3. Write map result to the output stream. + PortableWriterImpl writer = prj.Marshaller.StartMarshal(outStream); + + try + { + if (err == null) + { + writer.WriteBoolean(true); // Success flag. + + if (map == null) + writer.WriteBoolean(false); // Map produced no result. + else + { + writer.WriteBoolean(true); // Map produced result. + writer.WriteInt(map.Count); // Amount of mapped jobs. + + var jobHandles = new List<long>(map.Count); + + foreach (KeyValuePair<IComputeJob<T>, IClusterNode> mapEntry in map) + { + var job = new ComputeJobHolder(_compute.ClusterGroup.Ignite as Ignite, mapEntry.Key.ToNonGeneric()); + + IClusterNode node = mapEntry.Value; + + var jobHandle = ignite.HandleRegistry.Allocate(job); + + jobHandles.Add(jobHandle); + + writer.WriteLong(jobHandle); + + if (node.IsLocal) + writer.WriteBoolean(false); // Job is not serialized. + else + { + writer.WriteBoolean(true); // Job is serialized. + writer.WriteObject(job); + } + + writer.WriteGuid(node.Id); + } + + _jobHandles = jobHandles; + } + } + else + { + writer.WriteBoolean(false); // Map failed. + + // Write error as string because it is not important for Java, we need only to print + // a message in the log. + writer.WriteString("Map step failed [errType=" + err.GetType().Name + + ", errMsg=" + err.Message + ']'); + } + } + catch (Exception e) + { + // Something went wrong during marshaling. + Finish(default(TR), e); + + outStream.Reset(); + + writer.WriteBoolean(false); // Map failed. + writer.WriteString(e.Message); // Write error message. + } + finally + { + prj.Marshaller.FinishMarshal(writer); + } + } + + /** <inheritDoc /> */ + public int JobResultLocal(ComputeJobHolder job) + { + return (int)JobResult0(job.JobResult); + } + + /** <inheritDoc /> */ + [SuppressMessage("ReSharper", "PossibleInvalidOperationException")] + public int JobResultRemote(ComputeJobHolder job, PlatformMemoryStream stream) + { + // 1. Unmarshal result. + PortableReaderImpl reader = _compute.Marshaller.StartUnmarshal(stream); + + Guid nodeId = reader.ReadGuid().Value; + bool cancelled = reader.ReadBoolean(); + + try + { + object err; + + var data = PortableUtils.ReadWrappedInvocationResult(reader, out err); + + // 2. Process the result. + return (int) JobResult0(new ComputeJobResultImpl(data, (Exception) err, job.Job, nodeId, cancelled)); + } + catch (Exception e) + { + Finish(default(TR), e); + + if (!(e is IgniteException)) + throw new IgniteException("Failed to process job result: " + e.Message, e); + + throw; + } + } + + /** <inheritDoc /> */ + public void Reduce() + { + try + { + TR taskRes = _task.Reduce(_resCache ? _ress : EmptyRes); + + Finish(taskRes, null); + } + catch (Exception e) + { + Finish(default(TR), e); + + if (!(e is IgniteException)) + throw new IgniteException("Failed to reduce task: " + e.Message, e); + + throw; + } + } + + /** <inheritDoc /> */ + public void Complete(long taskHandle) + { + Clean(taskHandle); + } + + /// <summary> + /// Complete task with error. + /// </summary> + /// <param name="taskHandle">Task handle.</param> + /// <param name="e">Error.</param> + public void CompleteWithError(long taskHandle, Exception e) + { + Finish(default(TR), e); + + Clean(taskHandle); + } + + /** <inheritDoc /> */ + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", + Justification = "User object deserialization can throw any exception")] + public void CompleteWithError(long taskHandle, PlatformMemoryStream stream) + { + PortableReaderImpl reader = _compute.Marshaller.StartUnmarshal(stream); + + Exception err; + + try + { + if (reader.ReadBoolean()) + { + PortableResultWrapper res = reader.ReadObject<PortableUserObject>() + .Deserialize<PortableResultWrapper>(); + + err = (Exception) res.Result; + } + else + err = ExceptionUtils.GetException(reader.ReadString(), reader.ReadString()); + } + catch (Exception e) + { + err = new IgniteException("Task completed with error, but it cannot be unmarshalled: " + e.Message, e); + } + + CompleteWithError(taskHandle, err); + } + + /// <summary> + /// Task completion future. + /// </summary> + internal IFuture<TR> Future + { + get { return _fut; } + } + + /// <summary> + /// Manually set job handles. Used by closures because they have separate flow for map step. + /// </summary> + /// <param name="jobHandles">Job handles.</param> + internal void JobHandles(List<long> jobHandles) + { + _jobHandles = jobHandles; + } + + /// <summary> + /// Process job result. + /// </summary> + /// <param name="res">Result.</param> + private ComputeJobResultPolicy JobResult0(IComputeJobResult<object> res) + { + try + { + IList<IComputeJobResult<T>> ress0; + + // 1. Prepare old results. + if (_resCache) + { + if (_resJobs == null) + { + _resJobs = new HashSet<object>(); + + _ress = new List<IComputeJobResult<T>>(); + } + + ress0 = _ress; + } + else + ress0 = EmptyRes; + + // 2. Invoke user code. + var policy = _task.Result(new ComputeJobResultGenericWrapper<T>(res), ress0); + + // 3. Add result to the list only in case of success. + if (_resCache) + { + var job = res.Job().Unwrap(); + + if (!_resJobs.Add(job)) + { + // Duplicate result => find and replace it with the new one. + var oldRes = _ress.Single(item => item.Job() == job); + + _ress.Remove(oldRes); + } + + _ress.Add(new ComputeJobResultGenericWrapper<T>(res)); + } + + return policy; + } + catch (Exception e) + { + Finish(default(TR), e); + + if (!(e is IgniteException)) + throw new IgniteException("Failed to process job result: " + e.Message, e); + + throw; + } + } + + /// <summary> + /// Finish task. + /// </summary> + /// <param name="res">Result.</param> + /// <param name="err">Error.</param> + private void Finish(TR res, Exception err) + { + _fut.OnDone(res, err); + } + + /// <summary> + /// Clean-up task resources. + /// </summary> + /// <param name="taskHandle"></param> + private void Clean(long taskHandle) + { + var handles = _jobHandles; + + var handleRegistry = _compute.Marshaller.Ignite.HandleRegistry; + + if (handles != null) + foreach (var handle in handles) + handleRegistry.Release(handle, true); + + handleRegistry.Release(taskHandle, true); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs new file mode 100644 index 0000000..cbd26dd --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Datastream +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using System.Threading; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Portable; + + /// <summary> + /// Data streamer batch. + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] + internal class DataStreamerBatch<TK, TV> + { + /** Queue. */ + private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>(); + + /** Lock for concurrency. */ + private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); + + /** Previous batch. */ + private volatile DataStreamerBatch<TK, TV> _prev; + + /** Current queue size.*/ + private volatile int _size; + + /** Send guard. */ + private bool _sndGuard; + + /** */ + private readonly Future<object> _fut = new Future<object>(); + + /// <summary> + /// Constructor. + /// </summary> + public DataStreamerBatch() : this(null) + { + // No-op. + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="prev">Previous batch.</param> + public DataStreamerBatch(DataStreamerBatch<TK, TV> prev) + { + _prev = prev; + + if (prev != null) + Thread.MemoryBarrier(); // Prevent "prev" field escape. + + _fut.Listen(() => ParentsCompleted()); + } + + /// <summary> + /// Gets the future. + /// </summary> + public IFuture Future + { + get { return _fut; } + } + + /// <summary> + /// Add object to the batch. + /// </summary> + /// <param name="val">Value.</param> + /// <param name="cnt">Items count.</param> + /// <returns>Positive value in case batch is active, -1 in case no more additions are allowed.</returns> + public int Add(object val, int cnt) + { + // If we cannot enter read-lock immediately, then send is scheduled and batch is definetely blocked. + if (!_rwLock.TryEnterReadLock(0)) + return -1; + + try + { + // 1. Ensure additions are possible + if (_sndGuard) + return -1; + + // 2. Add data and increase size. + _queue.Enqueue(val); + +#pragma warning disable 0420 + int newSize = Interlocked.Add(ref _size, cnt); +#pragma warning restore 0420 + + return newSize; + } + finally + { + _rwLock.ExitReadLock(); + } + } + + /// <summary> + /// Internal send routine. + /// </summary> + /// <param name="ldr">streamer.</param> + /// <param name="plc">Policy.</param> + public void Send(DataStreamerImpl<TK, TV> ldr, int plc) + { + // 1. Delegate to the previous batch first. + DataStreamerBatch<TK, TV> prev0 = _prev; + + if (prev0 != null) + prev0.Send(ldr, DataStreamerImpl<TK, TV>.PlcContinue); + + // 2. Set guard. + _rwLock.EnterWriteLock(); + + try + { + if (_sndGuard) + return; + else + _sndGuard = true; + } + finally + { + _rwLock.ExitWriteLock(); + } + + var handleRegistry = ldr.Marshaller.Ignite.HandleRegistry; + + long futHnd = 0; + + // 3. Actual send. + ldr.Update(writer => + { + writer.WriteInt(plc); + + if (plc != DataStreamerImpl<TK, TV>.PlcCancelClose) + { + futHnd = handleRegistry.Allocate(_fut); + + try + { + writer.WriteLong(futHnd); + + WriteTo(writer); + } + catch (Exception) + { + handleRegistry.Release(futHnd); + + throw; + } + } + }); + + if (plc == DataStreamerImpl<TK, TV>.PlcCancelClose || _size == 0) + { + _fut.OnNullResult(); + + handleRegistry.Release(futHnd); + } + } + + + /// <summary> + /// Await completion of current and all previous loads. + /// </summary> + public void AwaitCompletion() + { + DataStreamerBatch<TK, TV> curBatch = this; + + while (curBatch != null) + { + try + { + curBatch._fut.Get(); + } + catch (Exception) + { + // Ignore. + } + + curBatch = curBatch._prev; + } + } + + /// <summary> + /// Write batch content. + /// </summary> + /// <param name="writer">Portable writer.</param> + private void WriteTo(PortableWriterImpl writer) + { + writer.WriteInt(_size); + + object val; + + while (_queue.TryDequeue(out val)) + { + // 1. Is it a collection? + ICollection<KeyValuePair<TK, TV>> entries = val as ICollection<KeyValuePair<TK, TV>>; + + if (entries != null) + { + foreach (KeyValuePair<TK, TV> item in entries) + { + writer.Write(item.Key); + writer.Write(item.Value); + } + + continue; + } + + // 2. Is it a single entry? + DataStreamerEntry<TK, TV> entry = val as DataStreamerEntry<TK, TV>; + + if (entry != null) { + writer.Write(entry.Key); + writer.Write(entry.Value); + + continue; + } + + // 3. Is it remove merker? + DataStreamerRemoveEntry<TK> rmvEntry = val as DataStreamerRemoveEntry<TK>; + + if (rmvEntry != null) + { + writer.Write(rmvEntry.Key); + writer.Write<object>(null); + } + } + } + + /// <summary> + /// Checck whether all previous batches are completed. + /// </summary> + /// <returns></returns> + private bool ParentsCompleted() + { + DataStreamerBatch<TK, TV> prev0 = _prev; + + if (prev0 != null) + { + if (prev0.ParentsCompleted()) + _prev = null; + else + return false; + } + + return _fut.IsDone; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs new file mode 100644 index 0000000..41ee176 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Datastream +{ + /// <summary> + /// Data streamer entry. + /// </summary> + internal class DataStreamerEntry<TK, TV> + { + /** Key. */ + private readonly TK _key; + + /** Value. */ + private readonly TV _val; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="key">Key.</param> + /// <param name="val">Value.</param> + public DataStreamerEntry(TK key, TV val) + { + _key = key; + _val = val; + } + + /// <summary> + /// Key. + /// </summary> + public TK Key + { + get + { + return _key; + } + } + + /// <summary> + /// Value. + /// </summary> + public TV Value + { + get + { + return _val; + } + } + } +}
