http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs deleted file mode 100644 index dfe0d18..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs +++ /dev/null @@ -1,484 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using System.Collections.Generic; - using System.Collections.ObjectModel; - using System.Diagnostics; - using System.Diagnostics.CodeAnalysis; - using System.Linq; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Compute; - using Apache.Ignite.Core.Impl.Cluster; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Compute.Closure; - using Apache.Ignite.Core.Impl.Memory; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Resource; - - /// <summary> - /// Compute task holder interface used to avoid generics. - /// </summary> - internal interface IComputeTaskHolder - { - /// <summary> - /// Perform map step. - /// </summary> - /// <param name="inStream">Stream with IN data (topology info).</param> - /// <param name="outStream">Stream for OUT data (map result).</param> - /// <returns>Map with produced jobs.</returns> - void Map(PlatformMemoryStream inStream, PlatformMemoryStream outStream); - - /// <summary> - /// Process local job result. - /// </summary> - /// <param name="jobId">Job pointer.</param> - /// <returns>Policy.</returns> - int JobResultLocal(ComputeJobHolder jobId); - - /// <summary> - /// Process remote job result. - /// </summary> - /// <param name="jobId">Job pointer.</param> - /// <param name="stream">Stream.</param> - /// <returns>Policy.</returns> - int JobResultRemote(ComputeJobHolder jobId, PlatformMemoryStream stream); - - /// <summary> - /// Perform task reduce. - /// </summary> - void Reduce(); - - /// <summary> - /// Complete task. - /// </summary> - /// <param name="taskHandle">Task handle.</param> - void Complete(long taskHandle); - - /// <summary> - /// Complete task with error. - /// </summary> - /// <param name="taskHandle">Task handle.</param> - /// <param name="stream">Stream with serialized exception.</param> - void CompleteWithError(long taskHandle, PlatformMemoryStream stream); - } - - /// <summary> - /// Compute task holder. - /// </summary> - internal class ComputeTaskHolder<TA, T, TR> : IComputeTaskHolder - { - /** Empty results. */ - private static readonly IList<IComputeJobResult<T>> EmptyRes = - new ReadOnlyCollection<IComputeJobResult<T>>(new List<IComputeJobResult<T>>()); - - /** Compute instance. */ - private readonly ComputeImpl _compute; - - /** Actual task. */ - private readonly IComputeTask<TA, T, TR> _task; - - /** Task argument. */ - private readonly TA _arg; - - /** Results cache flag. */ - private readonly bool _resCache; - - /** Task future. */ - private readonly Future<TR> _fut = new Future<TR>(); - - /** Jobs whose results are cached. */ - private ISet<object> _resJobs; - - /** Cached results. */ - private IList<IComputeJobResult<T>> _ress; - - /** Handles for jobs which are not serialized right away. */ - private volatile List<long> _jobHandles; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="grid">Grid.</param> - /// <param name="compute">Compute.</param> - /// <param name="task">Task.</param> - /// <param name="arg">Argument.</param> - public ComputeTaskHolder(Ignite grid, ComputeImpl compute, IComputeTask<TA, T, TR> task, TA arg) - { - _compute = compute; - _arg = arg; - _task = task; - - ResourceTypeDescriptor resDesc = ResourceProcessor.Descriptor(task.GetType()); - - IComputeResourceInjector injector = task as IComputeResourceInjector; - - if (injector != null) - injector.Inject(grid); - else - resDesc.InjectIgnite(task, grid); - - _resCache = !resDesc.TaskNoResultCache; - } - - /** <inheritDoc /> */ - [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", - Justification = "User code can throw any exception")] - public void Map(PlatformMemoryStream inStream, PlatformMemoryStream outStream) - { - IList<IClusterNode> subgrid; - - ClusterGroupImpl prj = (ClusterGroupImpl)_compute.ClusterGroup; - - var ignite = (Ignite) prj.Ignite; - - // 1. Unmarshal topology info if topology changed. - var reader = prj.Marshaller.StartUnmarshal(inStream); - - if (reader.ReadBoolean()) - { - long topVer = reader.ReadLong(); - - List<IClusterNode> nodes = new List<IClusterNode>(reader.ReadInt()); - - int nodesCnt = reader.ReadInt(); - - subgrid = new List<IClusterNode>(nodesCnt); - - for (int i = 0; i < nodesCnt; i++) - { - IClusterNode node = ignite.GetNode(reader.ReadGuid()); - - nodes.Add(node); - - if (reader.ReadBoolean()) - subgrid.Add(node); - } - - // Update parent projection to help other task callers avoid this overhead. - // Note that there is a chance that topology changed even further and this update fails. - // It means that some of subgrid nodes could have left the Grid. This is not critical - // for us, because Java will handle it gracefully. - prj.UpdateTopology(topVer, nodes); - } - else - { - IList<IClusterNode> nodes = prj.NodesNoRefresh(); - - Debug.Assert(nodes != null, "At least one topology update should have occurred."); - - subgrid = IgniteUtils.Shuffle(nodes); - } - - // 2. Perform map. - IDictionary<IComputeJob<T>, IClusterNode> map; - Exception err; - - try - { - map = _task.Map(subgrid, _arg); - - err = null; - } - catch (Exception e) - { - map = null; - - err = e; - - // Java can receive another exception in case of marshalling failure but it is not important. - Finish(default(TR), e); - } - - // 3. Write map result to the output stream. - PortableWriterImpl writer = prj.Marshaller.StartMarshal(outStream); - - try - { - if (err == null) - { - writer.WriteBoolean(true); // Success flag. - - if (map == null) - writer.WriteBoolean(false); // Map produced no result. - else - { - writer.WriteBoolean(true); // Map produced result. - writer.WriteInt(map.Count); // Amount of mapped jobs. - - var jobHandles = new List<long>(map.Count); - - foreach (KeyValuePair<IComputeJob<T>, IClusterNode> mapEntry in map) - { - var job = new ComputeJobHolder(_compute.ClusterGroup.Ignite as Ignite, mapEntry.Key.ToNonGeneric()); - - IClusterNode node = mapEntry.Value; - - var jobHandle = ignite.HandleRegistry.Allocate(job); - - jobHandles.Add(jobHandle); - - writer.WriteLong(jobHandle); - - if (node.IsLocal) - writer.WriteBoolean(false); // Job is not serialized. - else - { - writer.WriteBoolean(true); // Job is serialized. - writer.WriteObject(job); - } - - writer.WriteGuid(node.Id); - } - - _jobHandles = jobHandles; - } - } - else - { - writer.WriteBoolean(false); // Map failed. - - // Write error as string because it is not important for Java, we need only to print - // a message in the log. - writer.WriteString("Map step failed [errType=" + err.GetType().Name + - ", errMsg=" + err.Message + ']'); - } - } - catch (Exception e) - { - // Something went wrong during marshaling. - Finish(default(TR), e); - - outStream.Reset(); - - writer.WriteBoolean(false); // Map failed. - writer.WriteString(e.Message); // Write error message. - } - finally - { - prj.Marshaller.FinishMarshal(writer); - } - } - - /** <inheritDoc /> */ - public int JobResultLocal(ComputeJobHolder job) - { - return (int)JobResult0(job.JobResult); - } - - /** <inheritDoc /> */ - [SuppressMessage("ReSharper", "PossibleInvalidOperationException")] - public int JobResultRemote(ComputeJobHolder job, PlatformMemoryStream stream) - { - // 1. Unmarshal result. - PortableReaderImpl reader = _compute.Marshaller.StartUnmarshal(stream); - - Guid nodeId = reader.ReadGuid().Value; - bool cancelled = reader.ReadBoolean(); - - try - { - object err; - - var data = PortableUtils.ReadWrappedInvocationResult(reader, out err); - - // 2. Process the result. - return (int) JobResult0(new ComputeJobResultImpl(data, (Exception) err, job.Job, nodeId, cancelled)); - } - catch (Exception e) - { - Finish(default(TR), e); - - if (!(e is IgniteException)) - throw new IgniteException("Failed to process job result: " + e.Message, e); - - throw; - } - } - - /** <inheritDoc /> */ - public void Reduce() - { - try - { - TR taskRes = _task.Reduce(_resCache ? _ress : EmptyRes); - - Finish(taskRes, null); - } - catch (Exception e) - { - Finish(default(TR), e); - - if (!(e is IgniteException)) - throw new IgniteException("Failed to reduce task: " + e.Message, e); - - throw; - } - } - - /** <inheritDoc /> */ - public void Complete(long taskHandle) - { - Clean(taskHandle); - } - - /// <summary> - /// Complete task with error. - /// </summary> - /// <param name="taskHandle">Task handle.</param> - /// <param name="e">Error.</param> - public void CompleteWithError(long taskHandle, Exception e) - { - Finish(default(TR), e); - - Clean(taskHandle); - } - - /** <inheritDoc /> */ - [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", - Justification = "User object deserialization can throw any exception")] - public void CompleteWithError(long taskHandle, PlatformMemoryStream stream) - { - PortableReaderImpl reader = _compute.Marshaller.StartUnmarshal(stream); - - Exception err; - - try - { - if (reader.ReadBoolean()) - { - PortableResultWrapper res = reader.ReadObject<PortableUserObject>() - .Deserialize<PortableResultWrapper>(); - - err = (Exception) res.Result; - } - else - err = ExceptionUtils.GetException(reader.ReadString(), reader.ReadString()); - } - catch (Exception e) - { - err = new IgniteException("Task completed with error, but it cannot be unmarshalled: " + e.Message, e); - } - - CompleteWithError(taskHandle, err); - } - - /// <summary> - /// Task completion future. - /// </summary> - internal IFuture<TR> Future - { - get { return _fut; } - } - - /// <summary> - /// Manually set job handles. Used by closures because they have separate flow for map step. - /// </summary> - /// <param name="jobHandles">Job handles.</param> - internal void JobHandles(List<long> jobHandles) - { - _jobHandles = jobHandles; - } - - /// <summary> - /// Process job result. - /// </summary> - /// <param name="res">Result.</param> - private ComputeJobResultPolicy JobResult0(IComputeJobResult<object> res) - { - try - { - IList<IComputeJobResult<T>> ress0; - - // 1. Prepare old results. - if (_resCache) - { - if (_resJobs == null) - { - _resJobs = new HashSet<object>(); - - _ress = new List<IComputeJobResult<T>>(); - } - - ress0 = _ress; - } - else - ress0 = EmptyRes; - - // 2. Invoke user code. - var policy = _task.Result(new ComputeJobResultGenericWrapper<T>(res), ress0); - - // 3. Add result to the list only in case of success. - if (_resCache) - { - var job = res.Job().Unwrap(); - - if (!_resJobs.Add(job)) - { - // Duplicate result => find and replace it with the new one. - var oldRes = _ress.Single(item => item.Job() == job); - - _ress.Remove(oldRes); - } - - _ress.Add(new ComputeJobResultGenericWrapper<T>(res)); - } - - return policy; - } - catch (Exception e) - { - Finish(default(TR), e); - - if (!(e is IgniteException)) - throw new IgniteException("Failed to process job result: " + e.Message, e); - - throw; - } - } - - /// <summary> - /// Finish task. - /// </summary> - /// <param name="res">Result.</param> - /// <param name="err">Error.</param> - private void Finish(TR res, Exception err) - { - _fut.OnDone(res, err); - } - - /// <summary> - /// Clean-up task resources. - /// </summary> - /// <param name="taskHandle"></param> - private void Clean(long taskHandle) - { - var handles = _jobHandles; - - var handleRegistry = _compute.Marshaller.Ignite.HandleRegistry; - - if (handles != null) - foreach (var handle in handles) - handleRegistry.Release(handle, true); - - handleRegistry.Release(taskHandle, true); - } - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs deleted file mode 100644 index cbd26dd..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Datastream -{ - using System; - using System.Collections.Concurrent; - using System.Collections.Generic; - using System.Diagnostics.CodeAnalysis; - using System.Threading; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Portable; - - /// <summary> - /// Data streamer batch. - /// </summary> - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] - internal class DataStreamerBatch<TK, TV> - { - /** Queue. */ - private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>(); - - /** Lock for concurrency. */ - private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); - - /** Previous batch. */ - private volatile DataStreamerBatch<TK, TV> _prev; - - /** Current queue size.*/ - private volatile int _size; - - /** Send guard. */ - private bool _sndGuard; - - /** */ - private readonly Future<object> _fut = new Future<object>(); - - /// <summary> - /// Constructor. - /// </summary> - public DataStreamerBatch() : this(null) - { - // No-op. - } - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="prev">Previous batch.</param> - public DataStreamerBatch(DataStreamerBatch<TK, TV> prev) - { - _prev = prev; - - if (prev != null) - Thread.MemoryBarrier(); // Prevent "prev" field escape. - - _fut.Listen(() => ParentsCompleted()); - } - - /// <summary> - /// Gets the future. - /// </summary> - public IFuture Future - { - get { return _fut; } - } - - /// <summary> - /// Add object to the batch. - /// </summary> - /// <param name="val">Value.</param> - /// <param name="cnt">Items count.</param> - /// <returns>Positive value in case batch is active, -1 in case no more additions are allowed.</returns> - public int Add(object val, int cnt) - { - // If we cannot enter read-lock immediately, then send is scheduled and batch is definetely blocked. - if (!_rwLock.TryEnterReadLock(0)) - return -1; - - try - { - // 1. Ensure additions are possible - if (_sndGuard) - return -1; - - // 2. Add data and increase size. - _queue.Enqueue(val); - -#pragma warning disable 0420 - int newSize = Interlocked.Add(ref _size, cnt); -#pragma warning restore 0420 - - return newSize; - } - finally - { - _rwLock.ExitReadLock(); - } - } - - /// <summary> - /// Internal send routine. - /// </summary> - /// <param name="ldr">streamer.</param> - /// <param name="plc">Policy.</param> - public void Send(DataStreamerImpl<TK, TV> ldr, int plc) - { - // 1. Delegate to the previous batch first. - DataStreamerBatch<TK, TV> prev0 = _prev; - - if (prev0 != null) - prev0.Send(ldr, DataStreamerImpl<TK, TV>.PlcContinue); - - // 2. Set guard. - _rwLock.EnterWriteLock(); - - try - { - if (_sndGuard) - return; - else - _sndGuard = true; - } - finally - { - _rwLock.ExitWriteLock(); - } - - var handleRegistry = ldr.Marshaller.Ignite.HandleRegistry; - - long futHnd = 0; - - // 3. Actual send. - ldr.Update(writer => - { - writer.WriteInt(plc); - - if (plc != DataStreamerImpl<TK, TV>.PlcCancelClose) - { - futHnd = handleRegistry.Allocate(_fut); - - try - { - writer.WriteLong(futHnd); - - WriteTo(writer); - } - catch (Exception) - { - handleRegistry.Release(futHnd); - - throw; - } - } - }); - - if (plc == DataStreamerImpl<TK, TV>.PlcCancelClose || _size == 0) - { - _fut.OnNullResult(); - - handleRegistry.Release(futHnd); - } - } - - - /// <summary> - /// Await completion of current and all previous loads. - /// </summary> - public void AwaitCompletion() - { - DataStreamerBatch<TK, TV> curBatch = this; - - while (curBatch != null) - { - try - { - curBatch._fut.Get(); - } - catch (Exception) - { - // Ignore. - } - - curBatch = curBatch._prev; - } - } - - /// <summary> - /// Write batch content. - /// </summary> - /// <param name="writer">Portable writer.</param> - private void WriteTo(PortableWriterImpl writer) - { - writer.WriteInt(_size); - - object val; - - while (_queue.TryDequeue(out val)) - { - // 1. Is it a collection? - ICollection<KeyValuePair<TK, TV>> entries = val as ICollection<KeyValuePair<TK, TV>>; - - if (entries != null) - { - foreach (KeyValuePair<TK, TV> item in entries) - { - writer.Write(item.Key); - writer.Write(item.Value); - } - - continue; - } - - // 2. Is it a single entry? - DataStreamerEntry<TK, TV> entry = val as DataStreamerEntry<TK, TV>; - - if (entry != null) { - writer.Write(entry.Key); - writer.Write(entry.Value); - - continue; - } - - // 3. Is it remove merker? - DataStreamerRemoveEntry<TK> rmvEntry = val as DataStreamerRemoveEntry<TK>; - - if (rmvEntry != null) - { - writer.Write(rmvEntry.Key); - writer.Write<object>(null); - } - } - } - - /// <summary> - /// Checck whether all previous batches are completed. - /// </summary> - /// <returns></returns> - private bool ParentsCompleted() - { - DataStreamerBatch<TK, TV> prev0 = _prev; - - if (prev0 != null) - { - if (prev0.ParentsCompleted()) - _prev = null; - else - return false; - } - - return _fut.IsDone; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs deleted file mode 100644 index 41ee176..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerEntry.cs +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Datastream -{ - /// <summary> - /// Data streamer entry. - /// </summary> - internal class DataStreamerEntry<TK, TV> - { - /** Key. */ - private readonly TK _key; - - /** Value. */ - private readonly TV _val; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="key">Key.</param> - /// <param name="val">Value.</param> - public DataStreamerEntry(TK key, TV val) - { - _key = key; - _val = val; - } - - /// <summary> - /// Key. - /// </summary> - public TK Key - { - get - { - return _key; - } - } - - /// <summary> - /// Value. - /// </summary> - public TV Value - { - get - { - return _val; - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs deleted file mode 100644 index bf11397..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs +++ /dev/null @@ -1,832 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Datastream -{ - using System; - using System.Collections.Generic; - using System.Threading; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Datastream; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Unmanaged; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; - - /// <summary> - /// Data streamer internal interface to get rid of generics. - /// </summary> - internal interface IDataStreamer - { - /// <summary> - /// Callback invoked on topology size change. - /// </summary> - /// <param name="topVer">New topology version.</param> - /// <param name="topSize">New topology size.</param> - void TopologyChange(long topVer, int topSize); - } - - /// <summary> - /// Data streamer implementation. - /// </summary> - internal class DataStreamerImpl<TK, TV> : PlatformDisposableTarget, IDataStreamer, IDataStreamer<TK, TV> - { - -#pragma warning disable 0420 - - /** Policy: continue. */ - internal const int PlcContinue = 0; - - /** Policy: close. */ - internal const int PlcClose = 1; - - /** Policy: cancel and close. */ - internal const int PlcCancelClose = 2; - - /** Policy: flush. */ - internal const int PlcFlush = 3; - - /** Operation: update. */ - private const int OpUpdate = 1; - - /** Operation: set receiver. */ - private const int OpReceiver = 2; - - /** Cache name. */ - private readonly string _cacheName; - - /** Lock. */ - private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); - - /** Closed event. */ - private readonly ManualResetEventSlim _closedEvt = new ManualResetEventSlim(false); - - /** Close future. */ - private readonly Future<object> _closeFut = new Future<object>(); - - /** GC handle to this streamer. */ - private readonly long _hnd; - - /** Topology version. */ - private long _topVer; - - /** Topology size. */ - private int _topSize; - - /** Buffer send size. */ - private volatile int _bufSndSize; - - /** Current data streamer batch. */ - private volatile DataStreamerBatch<TK, TV> _batch; - - /** Flusher. */ - private readonly Flusher<TK, TV> _flusher; - - /** Receiver. */ - private volatile IStreamReceiver<TK, TV> _rcv; - - /** Receiver handle. */ - private long _rcvHnd; - - /** Receiver portable mode. */ - private readonly bool _keepPortable; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> - /// <param name="cacheName">Cache name.</param> - /// <param name="keepPortable">Portable flag.</param> - public DataStreamerImpl(IUnmanagedTarget target, PortableMarshaller marsh, string cacheName, bool keepPortable) - : base(target, marsh) - { - _cacheName = cacheName; - _keepPortable = keepPortable; - - // Create empty batch. - _batch = new DataStreamerBatch<TK, TV>(); - - // Allocate GC handle so that this data streamer could be easily dereferenced from native code. - WeakReference thisRef = new WeakReference(this); - - _hnd = marsh.Ignite.HandleRegistry.Allocate(thisRef); - - // Start topology listening. This call will ensure that buffer size member is updated. - UU.DataStreamerListenTopology(target, _hnd); - - // Membar to ensure fields initialization before leaving constructor. - Thread.MemoryBarrier(); - - // Start flusher after everything else is initialized. - _flusher = new Flusher<TK, TV>(thisRef); - - _flusher.RunThread(); - } - - /** <inheritDoc /> */ - public string CacheName - { - get { return _cacheName; } - } - - /** <inheritDoc /> */ - public bool AllowOverwrite - { - get - { - _rwLock.EnterReadLock(); - - try - { - ThrowIfDisposed(); - - return UU.DataStreamerAllowOverwriteGet(Target); - } - finally - { - _rwLock.ExitReadLock(); - } - } - set - { - _rwLock.EnterWriteLock(); - - try - { - ThrowIfDisposed(); - - UU.DataStreamerAllowOverwriteSet(Target, value); - } - finally - { - _rwLock.ExitWriteLock(); - } - } - } - - /** <inheritDoc /> */ - public bool SkipStore - { - get - { - _rwLock.EnterReadLock(); - - try - { - ThrowIfDisposed(); - - return UU.DataStreamerSkipStoreGet(Target); - } - finally - { - _rwLock.ExitReadLock(); - } - } - set - { - _rwLock.EnterWriteLock(); - - try - { - ThrowIfDisposed(); - - UU.DataStreamerSkipStoreSet(Target, value); - } - finally - { - _rwLock.ExitWriteLock(); - } - } - } - - /** <inheritDoc /> */ - public int PerNodeBufferSize - { - get - { - _rwLock.EnterReadLock(); - - try - { - ThrowIfDisposed(); - - return UU.DataStreamerPerNodeBufferSizeGet(Target); - } - finally - { - _rwLock.ExitReadLock(); - } - } - set - { - _rwLock.EnterWriteLock(); - - try - { - ThrowIfDisposed(); - - UU.DataStreamerPerNodeBufferSizeSet(Target, value); - - _bufSndSize = _topSize * value; - } - finally - { - _rwLock.ExitWriteLock(); - } - } - } - - /** <inheritDoc /> */ - public int PerNodeParallelOperations - { - get - { - _rwLock.EnterReadLock(); - - try - { - ThrowIfDisposed(); - - return UU.DataStreamerPerNodeParallelOperationsGet(Target); - } - finally - { - _rwLock.ExitReadLock(); - } - - } - set - { - _rwLock.EnterWriteLock(); - - try - { - ThrowIfDisposed(); - - UU.DataStreamerPerNodeParallelOperationsSet(Target, value); - } - finally - { - _rwLock.ExitWriteLock(); - } - - } - } - - /** <inheritDoc /> */ - public long AutoFlushFrequency - { - get - { - _rwLock.EnterReadLock(); - - try - { - ThrowIfDisposed(); - - return _flusher.Frequency; - } - finally - { - _rwLock.ExitReadLock(); - } - - } - set - { - _rwLock.EnterWriteLock(); - - try - { - ThrowIfDisposed(); - - _flusher.Frequency = value; - } - finally - { - _rwLock.ExitWriteLock(); - } - } - } - - /** <inheritDoc /> */ - public IFuture Future - { - get - { - ThrowIfDisposed(); - - return _closeFut; - } - } - - /** <inheritDoc /> */ - public IStreamReceiver<TK, TV> Receiver - { - get - { - ThrowIfDisposed(); - - return _rcv; - } - set - { - IgniteArgumentCheck.NotNull(value, "value"); - - var handleRegistry = Marshaller.Ignite.HandleRegistry; - - _rwLock.EnterWriteLock(); - - try - { - ThrowIfDisposed(); - - if (_rcv == value) - return; - - var rcvHolder = new StreamReceiverHolder(value, - (rec, grid, cache, stream, keepPortable) => - StreamReceiverHolder.InvokeReceiver((IStreamReceiver<TK, TV>) rec, grid, cache, stream, - keepPortable)); - - var rcvHnd0 = handleRegistry.Allocate(rcvHolder); - - try - { - DoOutOp(OpReceiver, w => - { - w.WriteLong(rcvHnd0); - - w.WriteObject(rcvHolder); - }); - } - catch (Exception) - { - handleRegistry.Release(rcvHnd0); - throw; - } - - if (_rcv != null) - handleRegistry.Release(_rcvHnd); - - _rcv = value; - _rcvHnd = rcvHnd0; - } - finally - { - _rwLock.ExitWriteLock(); - } - } - } - - /** <inheritDoc /> */ - public IFuture AddData(TK key, TV val) - { - ThrowIfDisposed(); - - IgniteArgumentCheck.NotNull(key, "key"); - - return Add0(new DataStreamerEntry<TK, TV>(key, val), 1); - } - - /** <inheritDoc /> */ - public IFuture AddData(KeyValuePair<TK, TV> pair) - { - ThrowIfDisposed(); - - return Add0(new DataStreamerEntry<TK, TV>(pair.Key, pair.Value), 1); - } - - /** <inheritDoc /> */ - public IFuture AddData(ICollection<KeyValuePair<TK, TV>> entries) - { - ThrowIfDisposed(); - - IgniteArgumentCheck.NotNull(entries, "entries"); - - return Add0(entries, entries.Count); - } - - /** <inheritDoc /> */ - public IFuture RemoveData(TK key) - { - ThrowIfDisposed(); - - IgniteArgumentCheck.NotNull(key, "key"); - - return Add0(new DataStreamerRemoveEntry<TK>(key), 1); - } - - /** <inheritDoc /> */ - public void TryFlush() - { - ThrowIfDisposed(); - - DataStreamerBatch<TK, TV> batch0 = _batch; - - if (batch0 != null) - Flush0(batch0, false, PlcFlush); - } - - /** <inheritDoc /> */ - public void Flush() - { - ThrowIfDisposed(); - - DataStreamerBatch<TK, TV> batch0 = _batch; - - if (batch0 != null) - Flush0(batch0, true, PlcFlush); - else - { - // Batch is null, i.e. data streamer is closing. Wait for close to complete. - _closedEvt.Wait(); - } - } - - /** <inheritDoc /> */ - public void Close(bool cancel) - { - _flusher.Stop(); - - while (true) - { - DataStreamerBatch<TK, TV> batch0 = _batch; - - if (batch0 == null) - { - // Wait for concurrent close to finish. - _closedEvt.Wait(); - - return; - } - - if (Flush0(batch0, true, cancel ? PlcCancelClose : PlcClose)) - { - _closeFut.OnDone(null, null); - - _rwLock.EnterWriteLock(); - - try - { - base.Dispose(true); - - if (_rcv != null) - Marshaller.Ignite.HandleRegistry.Release(_rcvHnd); - - _closedEvt.Set(); - } - finally - { - _rwLock.ExitWriteLock(); - } - - Marshaller.Ignite.HandleRegistry.Release(_hnd); - - break; - } - } - } - - /** <inheritDoc /> */ - public IDataStreamer<TK1, TV1> WithKeepPortable<TK1, TV1>() - { - if (_keepPortable) - { - var result = this as IDataStreamer<TK1, TV1>; - - if (result == null) - throw new InvalidOperationException( - "Can't change type of portable streamer. WithKeepPortable has been called on an instance of " + - "portable streamer with incompatible generic arguments."); - - return result; - } - - return new DataStreamerImpl<TK1, TV1>(UU.ProcessorDataStreamer(Marshaller.Ignite.InteropProcessor, - _cacheName, true), Marshaller, _cacheName, true); - } - - /** <inheritDoc /> */ - protected override void Dispose(bool disposing) - { - if (disposing) - Close(false); // Normal dispose: do not cancel - else - { - // Finalizer: just close Java streamer - try - { - if (_batch != null) - _batch.Send(this, PlcCancelClose); - } - catch (Exception) - { - // Finalizers should never throw - } - - Marshaller.Ignite.HandleRegistry.Release(_hnd, true); - Marshaller.Ignite.HandleRegistry.Release(_rcvHnd, true); - - base.Dispose(false); - } - } - - /** <inheritDoc /> */ - ~DataStreamerImpl() - { - Dispose(false); - } - - /** <inheritDoc /> */ - public void TopologyChange(long topVer, int topSize) - { - _rwLock.EnterWriteLock(); - - try - { - ThrowIfDisposed(); - - if (_topVer < topVer) - { - _topVer = topVer; - _topSize = topSize; - - _bufSndSize = topSize * UU.DataStreamerPerNodeBufferSizeGet(Target); - } - } - finally - { - _rwLock.ExitWriteLock(); - } - - } - - /// <summary> - /// Internal add/remove routine. - /// </summary> - /// <param name="val">Value.</param> - /// <param name="cnt">Items count.</param> - /// <returns>Future.</returns> - private IFuture Add0(object val, int cnt) - { - int bufSndSize0 = _bufSndSize; - - while (true) - { - var batch0 = _batch; - - if (batch0 == null) - throw new InvalidOperationException("Data streamer is stopped."); - - int size = batch0.Add(val, cnt); - - if (size == -1) - { - // Batch is blocked, perform CAS. - Interlocked.CompareExchange(ref _batch, - new DataStreamerBatch<TK, TV>(batch0), batch0); - - continue; - } - if (size >= bufSndSize0) - // Batch is too big, schedule flush. - Flush0(batch0, false, PlcContinue); - - return batch0.Future; - } - } - - /// <summary> - /// Internal flush routine. - /// </summary> - /// <param name="curBatch"></param> - /// <param name="wait">Whether to wait for flush to complete.</param> - /// <param name="plc">Whether this is the last batch.</param> - /// <returns>Whether this call was able to CAS previous batch</returns> - private bool Flush0(DataStreamerBatch<TK, TV> curBatch, bool wait, int plc) - { - // 1. Try setting new current batch to help further adders. - bool res = Interlocked.CompareExchange(ref _batch, - (plc == PlcContinue || plc == PlcFlush) ? - new DataStreamerBatch<TK, TV>(curBatch) : null, curBatch) == curBatch; - - // 2. Perform actual send. - curBatch.Send(this, plc); - - if (wait) - // 3. Wait for all futures to finish. - curBatch.AwaitCompletion(); - - return res; - } - - /// <summary> - /// Start write. - /// </summary> - /// <returns>Writer.</returns> - internal void Update(Action<PortableWriterImpl> action) - { - _rwLock.EnterReadLock(); - - try - { - ThrowIfDisposed(); - - DoOutOp(OpUpdate, action); - } - finally - { - _rwLock.ExitReadLock(); - } - } - - /// <summary> - /// Flusher. - /// </summary> - private class Flusher<TK1, TV1> - { - /** State: running. */ - private const int StateRunning = 0; - - /** State: stopping. */ - private const int StateStopping = 1; - - /** State: stopped. */ - private const int StateStopped = 2; - - /** Data streamer. */ - private readonly WeakReference _ldrRef; - - /** Finish flag. */ - private int _state; - - /** Flush frequency. */ - private long _freq; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="ldrRef">Data streamer weak reference..</param> - public Flusher(WeakReference ldrRef) - { - _ldrRef = ldrRef; - - lock (this) - { - _state = StateRunning; - } - } - - /// <summary> - /// Main flusher routine. - /// </summary> - private void Run() - { - bool force = false; - long curFreq = 0; - - try - { - while (true) - { - if (curFreq > 0 || force) - { - var ldr = _ldrRef.Target as DataStreamerImpl<TK1, TV1>; - - if (ldr == null) - return; - - ldr.TryFlush(); - - force = false; - } - - lock (this) - { - // Stop immediately. - if (_state == StateStopping) - return; - - if (curFreq == _freq) - { - // Frequency is unchanged - if (curFreq == 0) - // Just wait for a second and re-try. - Monitor.Wait(this, 1000); - else - { - // Calculate remaining time. - DateTime now = DateTime.Now; - - long ticks; - - try - { - ticks = now.AddMilliseconds(curFreq).Ticks - now.Ticks; - - if (ticks > int.MaxValue) - ticks = int.MaxValue; - } - catch (ArgumentOutOfRangeException) - { - // Handle possible overflow. - ticks = int.MaxValue; - } - - Monitor.Wait(this, TimeSpan.FromTicks(ticks)); - } - } - else - { - if (curFreq != 0) - force = true; - - curFreq = _freq; - } - } - } - } - finally - { - // Let streamer know about stop. - lock (this) - { - _state = StateStopped; - - Monitor.PulseAll(this); - } - } - } - - /// <summary> - /// Frequency. - /// </summary> - public long Frequency - { - get - { - return Interlocked.Read(ref _freq); - } - - set - { - lock (this) - { - if (_freq != value) - { - _freq = value; - - Monitor.PulseAll(this); - } - } - } - } - - /// <summary> - /// Stop flusher. - /// </summary> - public void Stop() - { - lock (this) - { - if (_state == StateRunning) - { - _state = StateStopping; - - Monitor.PulseAll(this); - } - - while (_state != StateStopped) - Monitor.Wait(this); - } - } - - /// <summary> - /// Runs the flusher thread. - /// </summary> - public void RunThread() - { - new Thread(Run).Start(); - } - } - -#pragma warning restore 0420 - - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs deleted file mode 100644 index 7e65934..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Datastream -{ - /// <summary> - /// Remove marker. - /// </summary> - internal class DataStreamerRemoveEntry<TK> - { - /** Key to remove. */ - private readonly TK _key; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="key">Key.</param> - public DataStreamerRemoveEntry(TK key) - { - _key = key; - } - - /// <summary> - /// Key. - /// </summary> - public TK Key - { - get - { - return _key; - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs deleted file mode 100644 index 5a7c104..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Datastream -{ - using System; - using System.Collections.Generic; - using System.Diagnostics; - using Apache.Ignite.Core.Cache; - using Apache.Ignite.Core.Datastream; - using Apache.Ignite.Core.Impl.Cache; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Portable.IO; - using Apache.Ignite.Core.Impl.Unmanaged; - using Apache.Ignite.Core.Portable; - - /// <summary> - /// Portable wrapper for <see cref="IStreamReceiver{TK,TV}"/>. - /// </summary> - internal class StreamReceiverHolder : IPortableWriteAware - { - /** */ - private const byte RcvNormal = 0; - - /** */ - public const byte RcvTransformer = 1; - - /** Generic receiver. */ - private readonly object _rcv; - - /** Invoker delegate. */ - private readonly Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> _invoke; - - /// <summary> - /// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class. - /// </summary> - /// <param name="reader">The reader.</param> - public StreamReceiverHolder(PortableReaderImpl reader) - { - var rcvType = reader.ReadByte(); - - _rcv = PortableUtils.ReadPortableOrSerializable<object>(reader); - - Debug.Assert(_rcv != null); - - var type = _rcv.GetType(); - - if (rcvType == RcvTransformer) - { - // rcv is a user ICacheEntryProcessor<K, V, A, R>, construct StreamTransformer from it. - // (we can't marshal StreamTransformer directly, because it is generic, - // and we do not know type arguments that user will have) - _rcv = DelegateTypeDescriptor.GetStreamTransformerCtor(type)(_rcv); - } - - _invoke = DelegateTypeDescriptor.GetStreamReceiver(_rcv.GetType()); - } - - /// <summary> - /// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class. - /// </summary> - /// <param name="rcv">Receiver.</param> - /// <param name="invoke">Invoke delegate.</param> - public StreamReceiverHolder(object rcv, - Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> invoke) - { - Debug.Assert(rcv != null); - Debug.Assert(invoke != null); - - _rcv = rcv; - _invoke = invoke; - } - - /** <inheritdoc /> */ - public void WritePortable(IPortableWriter writer) - { - var w = writer.RawWriter(); - - var writeAware = _rcv as IPortableWriteAware; - - if (writeAware != null) - writeAware.WritePortable(writer); - else - { - w.WriteByte(RcvNormal); - PortableUtils.WritePortableOrSerializable((PortableWriterImpl) writer, _rcv); - } - } - - /// <summary> - /// Updates cache with batch of entries. - /// </summary> - /// <param name="grid">The grid.</param> - /// <param name="cache">Cache.</param> - /// <param name="stream">Stream.</param> - /// <param name="keepPortable">Portable flag.</param> - public void Receive(Ignite grid, IUnmanagedTarget cache, IPortableStream stream, bool keepPortable) - { - Debug.Assert(grid != null); - Debug.Assert(cache != null); - Debug.Assert(stream != null); - - _invoke(_rcv, grid, cache, stream, keepPortable); - } - - /// <summary> - /// Invokes the receiver. - /// </summary> - /// <param name="receiver">Receiver.</param> - /// <param name="grid">Grid.</param> - /// <param name="cache">Cache.</param> - /// <param name="stream">Stream.</param> - /// <param name="keepPortable">Portable flag.</param> - public static void InvokeReceiver<TK, TV>(IStreamReceiver<TK, TV> receiver, Ignite grid, IUnmanagedTarget cache, - IPortableStream stream, bool keepPortable) - { - var reader = grid.Marshaller.StartUnmarshal(stream, keepPortable); - - var size = reader.ReadInt(); - - var entries = new List<ICacheEntry<TK, TV>>(size); - - for (var i = 0; i < size; i++) - entries.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>())); - - receiver.Receive(grid.Cache<TK, TV>(cache, keepPortable), entries); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs deleted file mode 100644 index 3972bb0..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs +++ /dev/null @@ -1,498 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Events -{ - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.Diagnostics.CodeAnalysis; - using System.Linq; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Events; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Handle; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Portable.IO; - using Apache.Ignite.Core.Impl.Unmanaged; - using Apache.Ignite.Core.Portable; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; - - /// <summary> - /// Ignite events. - /// </summary> - internal class Events : PlatformTarget, IEvents - { - /// <summary> - /// Opcodes. - /// </summary> - protected enum Op - { - RemoteQuery = 1, - RemoteListen = 2, - StopRemoteListen = 3, - WaitForLocal = 4, - LocalQuery = 5, - RecordLocal = 6, - EnableLocal = 8, - DisableLocal = 9, - GetEnabledEvents = 10 - } - - /** Map from user func to local wrapper, needed for invoke/unsubscribe. */ - private readonly Dictionary<object, Dictionary<int, LocalHandledEventFilter>> _localFilters - = new Dictionary<object, Dictionary<int, LocalHandledEventFilter>>(); - - /** Grid. */ - protected readonly Ignite Ignite; - - /// <summary> - /// Initializes a new instance of the <see cref="Events"/> class. - /// </summary> - /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> - /// <param name="clusterGroup">Cluster group.</param> - public Events(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup) - : base(target, marsh) - { - Debug.Assert(clusterGroup != null); - - ClusterGroup = clusterGroup; - - Ignite = (Ignite) clusterGroup.Ignite; - } - - /** <inheritDoc /> */ - public virtual IEvents WithAsync() - { - return new EventsAsync(UU.EventsWithAsync(Target), Marshaller, ClusterGroup); - } - - /** <inheritDoc /> */ - public virtual bool IsAsync - { - get { return false; } - } - - /** <inheritDoc /> */ - public virtual IFuture GetFuture() - { - throw IgniteUtils.GetAsyncModeDisabledException(); - } - - /** <inheritDoc /> */ - public virtual IFuture<TResult> GetFuture<TResult>() - { - throw IgniteUtils.GetAsyncModeDisabledException(); - } - - /** <inheritDoc /> */ - public IClusterGroup ClusterGroup { get; private set; } - - /** <inheritDoc /> */ - public virtual List<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types) - where T : IEvent - { - IgniteArgumentCheck.NotNull(filter, "filter"); - - return DoOutInOp((int) Op.RemoteQuery, - writer => - { - writer.Write(new PortableOrSerializableObjectHolder(filter)); - - writer.WriteLong((long) (timeout == null ? 0 : timeout.Value.TotalMilliseconds)); - - WriteEventTypes(types, writer); - }, - reader => ReadEvents<T>(reader)); - } - - /** <inheritDoc /> */ - public virtual Guid RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true, - IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types) - where T : IEvent - { - IgniteArgumentCheck.Ensure(bufSize > 0, "bufSize", "should be > 0"); - IgniteArgumentCheck.Ensure(interval == null || interval.Value.TotalMilliseconds > 0, "interval", "should be null or >= 0"); - - return DoOutInOp((int) Op.RemoteListen, - writer => - { - writer.WriteInt(bufSize); - writer.WriteLong((long) (interval == null ? 0 : interval.Value.TotalMilliseconds)); - writer.WriteBoolean(autoUnsubscribe); - - writer.WriteBoolean(localListener != null); - - if (localListener != null) - { - var listener = new RemoteListenEventFilter(Ignite, (id, e) => localListener.Invoke(id, (T) e)); - writer.WriteLong(Ignite.HandleRegistry.Allocate(listener)); - } - - writer.WriteBoolean(remoteFilter != null); - - if (remoteFilter != null) - writer.Write(new PortableOrSerializableObjectHolder(remoteFilter)); - - WriteEventTypes(types, writer); - }, - reader => Marshaller.StartUnmarshal(reader).ReadGuid() ?? Guid.Empty); - } - - /** <inheritDoc /> */ - public virtual void StopRemoteListen(Guid opId) - { - DoOutOp((int) Op.StopRemoteListen, writer => - { - Marshaller.StartMarshal(writer).WriteGuid(opId); - }); - } - - /** <inheritDoc /> */ - public IEvent WaitForLocal(params int[] types) - { - return WaitForLocal<IEvent>(null, types); - } - - /** <inheritDoc /> */ - public virtual T WaitForLocal<T>(IEventFilter<T> filter, params int[] types) where T : IEvent - { - long hnd = 0; - - try - { - return WaitForLocal0(filter, ref hnd, types); - } - finally - { - if (filter != null) - Ignite.HandleRegistry.Release(hnd); - } - } - - /** <inheritDoc /> */ - public List<IEvent> LocalQuery(params int[] types) - { - return DoOutInOp((int) Op.LocalQuery, - writer => WriteEventTypes(types, writer), - reader => ReadEvents<IEvent>(reader)); - } - - /** <inheritDoc /> */ - public void RecordLocal(IEvent evt) - { - throw new NotImplementedException("GG-10244"); - } - - /** <inheritDoc /> */ - public void LocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent - { - IgniteArgumentCheck.NotNull(listener, "listener"); - IgniteArgumentCheck.NotNullOrEmpty(types, "types"); - - foreach (var type in types) - LocalListen(listener, type); - } - - /** <inheritDoc /> */ - public bool StopLocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent - { - lock (_localFilters) - { - Dictionary<int, LocalHandledEventFilter> filters; - - if (!_localFilters.TryGetValue(listener, out filters)) - return false; - - var success = false; - - // Should do this inside lock to avoid race with subscription - // ToArray is required because we are going to modify underlying dictionary during enumeration - foreach (var filter in GetLocalFilters(listener, types).ToArray()) - success |= UU.EventsStopLocalListen(Target, filter.Handle); - - return success; - } - } - - /** <inheritDoc /> */ - public void EnableLocal(params int[] types) - { - IgniteArgumentCheck.NotNullOrEmpty(types, "types"); - - DoOutOp((int)Op.EnableLocal, writer => WriteEventTypes(types, writer)); - } - - /** <inheritDoc /> */ - public void DisableLocal(params int[] types) - { - IgniteArgumentCheck.NotNullOrEmpty(types, "types"); - - DoOutOp((int)Op.DisableLocal, writer => WriteEventTypes(types, writer)); - } - - /** <inheritDoc /> */ - public int[] GetEnabledEvents() - { - return DoInOp((int)Op.GetEnabledEvents, reader => ReadEventTypes(reader)); - } - - /** <inheritDoc /> */ - public bool IsEnabled(int type) - { - return UU.EventsIsEnabled(Target, type); - } - - /// <summary> - /// Waits for the specified events. - /// </summary> - /// <typeparam name="T">Type of events.</typeparam> - /// <param name="filter">Optional filtering predicate. Event wait will end as soon as it returns false.</param> - /// <param name="handle">The filter handle, if applicable.</param> - /// <param name="types">Types of the events to wait for. - /// If not provided, all events will be passed to the filter.</param> - /// <returns>Ignite event.</returns> - protected T WaitForLocal0<T>(IEventFilter<T> filter, ref long handle, params int[] types) where T : IEvent - { - if (filter != null) - handle = Ignite.HandleRegistry.Allocate(new LocalEventFilter - { - InvokeFunc = stream => InvokeLocalFilter(stream, filter) - }); - - var hnd = handle; - - return DoOutInOp((int)Op.WaitForLocal, - writer => - { - if (filter != null) - { - writer.WriteBoolean(true); - writer.WriteLong(hnd); - } - else - writer.WriteBoolean(false); - - WriteEventTypes(types, writer); - }, - reader => EventReader.Read<T>(Marshaller.StartUnmarshal(reader))); - } - - /// <summary> - /// Reads events from a portable stream. - /// </summary> - /// <typeparam name="T">Event type.</typeparam> - /// <param name="reader">Reader.</param> - /// <returns>Resulting list or null.</returns> - private List<T> ReadEvents<T>(IPortableStream reader) where T : IEvent - { - return ReadEvents<T>(Marshaller.StartUnmarshal(reader)); - } - - /// <summary> - /// Reads events from a portable reader. - /// </summary> - /// <typeparam name="T">Event type.</typeparam> - /// <param name="portableReader">Reader.</param> - /// <returns>Resulting list or null.</returns> - protected static List<T> ReadEvents<T>(PortableReaderImpl portableReader) where T : IEvent - { - var count = portableReader.RawReader().ReadInt(); - - if (count == -1) - return null; - - var result = new List<T>(count); - - for (var i = 0; i < count; i++) - result.Add(EventReader.Read<T>(portableReader)); - - return result; - } - - /// <summary> - /// Gets local filters by user listener and event type. - /// </summary> - /// <param name="listener">Listener.</param> - /// <param name="types">Types.</param> - /// <returns>Collection of local listener wrappers.</returns> - [SuppressMessage("ReSharper", "InconsistentlySynchronizedField", - Justification = "This private method should be always called within a lock on localFilters")] - private IEnumerable<LocalHandledEventFilter> GetLocalFilters(object listener, int[] types) - { - Dictionary<int, LocalHandledEventFilter> filters; - - if (!_localFilters.TryGetValue(listener, out filters)) - return Enumerable.Empty<LocalHandledEventFilter>(); - - if (types.Length == 0) - return filters.Values; - - return types.Select(type => - { - LocalHandledEventFilter filter; - - return filters.TryGetValue(type, out filter) ? filter : null; - }).Where(x => x != null); - } - - /// <summary> - /// Adds an event listener for local events. - /// </summary> - /// <typeparam name="T">Type of events.</typeparam> - /// <param name="listener">Predicate that is called on each received event.</param> - /// <param name="type">Event type for which this listener will be notified</param> - private void LocalListen<T>(IEventFilter<T> listener, int type) where T : IEvent - { - lock (_localFilters) - { - Dictionary<int, LocalHandledEventFilter> filters; - - if (!_localFilters.TryGetValue(listener, out filters)) - { - filters = new Dictionary<int, LocalHandledEventFilter>(); - - _localFilters[listener] = filters; - } - - LocalHandledEventFilter localFilter; - - if (!filters.TryGetValue(type, out localFilter)) - { - localFilter = CreateLocalFilter(listener, type); - - filters[type] = localFilter; - } - - UU.EventsLocalListen(Target, localFilter.Handle, type); - } - } - - /// <summary> - /// Creates a user filter wrapper. - /// </summary> - /// <typeparam name="T">Event object type.</typeparam> - /// <param name="listener">Listener.</param> - /// <param name="type">Event type.</param> - /// <returns>Created wrapper.</returns> - private LocalHandledEventFilter CreateLocalFilter<T>(IEventFilter<T> listener, int type) where T : IEvent - { - var result = new LocalHandledEventFilter( - stream => InvokeLocalFilter(stream, listener), - unused => - { - lock (_localFilters) - { - Dictionary<int, LocalHandledEventFilter> filters; - - if (_localFilters.TryGetValue(listener, out filters)) - { - filters.Remove(type); - - if (filters.Count == 0) - _localFilters.Remove(listener); - } - } - }); - - result.Handle = Ignite.HandleRegistry.Allocate(result); - - return result; - } - - /// <summary> - /// Invokes local filter using data from specified stream. - /// </summary> - /// <typeparam name="T">Event object type.</typeparam> - /// <param name="stream">The stream.</param> - /// <param name="listener">The listener.</param> - /// <returns>Filter invocation result.</returns> - private bool InvokeLocalFilter<T>(IPortableStream stream, IEventFilter<T> listener) where T : IEvent - { - var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream)); - - // No guid in local mode - return listener.Invoke(Guid.Empty, evt); - } - - /// <summary> - /// Writes the event types. - /// </summary> - /// <param name="types">Types.</param> - /// <param name="writer">Writer.</param> - private static void WriteEventTypes(int[] types, IPortableRawWriter writer) - { - if (types.Length == 0) - types = null; // empty array means no type filtering - - writer.WriteIntArray(types); - } - - /// <summary> - /// Writes the event types. - /// </summary> - /// <param name="reader">Reader.</param> - private int[] ReadEventTypes(IPortableStream reader) - { - return Marshaller.StartUnmarshal(reader).ReadIntArray(); - } - - /// <summary> - /// Local user filter wrapper. - /// </summary> - private class LocalEventFilter : IInteropCallback - { - /** */ - public Func<IPortableStream, bool> InvokeFunc; - - /** <inheritdoc /> */ - public int Invoke(IPortableStream stream) - { - return InvokeFunc(stream) ? 1 : 0; - } - } - - /// <summary> - /// Local user filter wrapper with handle. - /// </summary> - private class LocalHandledEventFilter : Handle<Func<IPortableStream, bool>>, IInteropCallback - { - /** */ - public long Handle; - - /** <inheritdoc /> */ - public int Invoke(IPortableStream stream) - { - return Target(stream) ? 1 : 0; - } - - /// <summary> - /// Initializes a new instance of the <see cref="LocalHandledEventFilter"/> class. - /// </summary> - /// <param name="invokeFunc">The invoke function.</param> - /// <param name="releaseAction">The release action.</param> - public LocalHandledEventFilter( - Func<IPortableStream, bool> invokeFunc, Action<Func<IPortableStream, bool>> releaseAction) - : base(invokeFunc, releaseAction) - { - // No-op. - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs deleted file mode 100644 index 632d8b8..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Events -{ - using System; - using System.Collections.Generic; - using System.Diagnostics.CodeAnalysis; - using System.Threading; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Events; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Unmanaged; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; - - /// <summary> - /// Async Ignite events. - /// </summary> - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] - internal class EventsAsync : Events - { - /** */ - private readonly ThreadLocal<int> _lastAsyncOp = new ThreadLocal<int>(() => OpNone); - - /** */ - private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>(); - - /// <summary> - /// Initializes a new instance of the <see cref="Events"/> class. - /// </summary> - /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> - /// <param name="clusterGroup">Cluster group.</param> - public EventsAsync(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup) - : base(target, marsh, clusterGroup) - { - // No-op. - } - - /** <inheritdoc /> */ - public override List<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types) - { - _lastAsyncOp.Value = (int) Op.RemoteQuery; - - var result = base.RemoteQuery(filter, timeout, types); - - // Result is a List<T> so we can't create proper converter later in GetFuture call from user. - // ReSharper disable once RedundantTypeArgumentsOfMethod (otherwise won't compile in VS2010 / TC) - _curFut.Value = GetFuture<List<T>>((futId, futTyp) => UU.TargetListenFutureForOperation(Target, futId, futTyp, - (int) Op.RemoteQuery), convertFunc: ReadEvents<T>); - - return result; - } - - /** <inheritdoc /> */ - public override Guid RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true, - IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types) - { - _lastAsyncOp.Value = (int) Op.RemoteListen; - _curFut.Value = null; - - return base.RemoteListen(bufSize, interval, autoUnsubscribe, localListener, remoteFilter, types); - } - - /** <inheritdoc /> */ - public override void StopRemoteListen(Guid opId) - { - _lastAsyncOp.Value = (int) Op.StopRemoteListen; - _curFut.Value = null; - - base.StopRemoteListen(opId); - } - - /** <inheritdoc /> */ - public override T WaitForLocal<T>(IEventFilter<T> filter, params int[] types) - { - _lastAsyncOp.Value = (int) Op.WaitForLocal; - - long hnd = 0; - - try - { - var result = WaitForLocal0(filter, ref hnd, types); - - if (filter != null) - { - // Dispose handle as soon as future ends. - var fut = GetFuture<T>(); - - _curFut.Value = fut; - - fut.Listen(() => Ignite.HandleRegistry.Release(hnd)); - } - else - _curFut.Value = null; - - return result; - } - catch (Exception) - { - Ignite.HandleRegistry.Release(hnd); - throw; - } - } - - /** <inheritdoc /> */ - public override IEvents WithAsync() - { - return this; - } - - /** <inheritdoc /> */ - public override bool IsAsync - { - get { return true; } - } - - /** <inheritdoc /> */ - public override IFuture GetFuture() - { - return GetFuture<object>(); - } - - /** <inheritdoc /> */ - public override IFuture<T> GetFuture<T>() - { - if (_curFut.Value != null) - { - var fut = _curFut.Value; - _curFut.Value = null; - return (IFuture<T>) fut; - } - - Func<PortableReaderImpl, T> converter = null; - - if (_lastAsyncOp.Value == (int) Op.WaitForLocal) - converter = reader => (T) EventReader.Read<IEvent>(reader); - - return GetFuture((futId, futTyp) => UU.TargetListenFutureForOperation(Target, futId, futTyp, _lastAsyncOp.Value), - convertFunc: converter); - } - } -} \ No newline at end of file
