http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs new file mode 100644 index 0000000..bf11397 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs @@ -0,0 +1,832 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Datastream +{ + using System; + using System.Collections.Generic; + using System.Threading; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Datastream; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Unmanaged; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Data streamer internal interface to get rid of generics. + /// </summary> + internal interface IDataStreamer + { + /// <summary> + /// Callback invoked on topology size change. + /// </summary> + /// <param name="topVer">New topology version.</param> + /// <param name="topSize">New topology size.</param> + void TopologyChange(long topVer, int topSize); + } + + /// <summary> + /// Data streamer implementation. + /// </summary> + internal class DataStreamerImpl<TK, TV> : PlatformDisposableTarget, IDataStreamer, IDataStreamer<TK, TV> + { + +#pragma warning disable 0420 + + /** Policy: continue. */ + internal const int PlcContinue = 0; + + /** Policy: close. */ + internal const int PlcClose = 1; + + /** Policy: cancel and close. */ + internal const int PlcCancelClose = 2; + + /** Policy: flush. */ + internal const int PlcFlush = 3; + + /** Operation: update. */ + private const int OpUpdate = 1; + + /** Operation: set receiver. */ + private const int OpReceiver = 2; + + /** Cache name. */ + private readonly string _cacheName; + + /** Lock. */ + private readonly ReaderWriterLockSlim _rwLock = new ReaderWriterLockSlim(); + + /** Closed event. */ + private readonly ManualResetEventSlim _closedEvt = new ManualResetEventSlim(false); + + /** Close future. */ + private readonly Future<object> _closeFut = new Future<object>(); + + /** GC handle to this streamer. */ + private readonly long _hnd; + + /** Topology version. */ + private long _topVer; + + /** Topology size. */ + private int _topSize; + + /** Buffer send size. */ + private volatile int _bufSndSize; + + /** Current data streamer batch. */ + private volatile DataStreamerBatch<TK, TV> _batch; + + /** Flusher. */ + private readonly Flusher<TK, TV> _flusher; + + /** Receiver. */ + private volatile IStreamReceiver<TK, TV> _rcv; + + /** Receiver handle. */ + private long _rcvHnd; + + /** Receiver portable mode. */ + private readonly bool _keepPortable; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="cacheName">Cache name.</param> + /// <param name="keepPortable">Portable flag.</param> + public DataStreamerImpl(IUnmanagedTarget target, PortableMarshaller marsh, string cacheName, bool keepPortable) + : base(target, marsh) + { + _cacheName = cacheName; + _keepPortable = keepPortable; + + // Create empty batch. + _batch = new DataStreamerBatch<TK, TV>(); + + // Allocate GC handle so that this data streamer could be easily dereferenced from native code. + WeakReference thisRef = new WeakReference(this); + + _hnd = marsh.Ignite.HandleRegistry.Allocate(thisRef); + + // Start topology listening. This call will ensure that buffer size member is updated. + UU.DataStreamerListenTopology(target, _hnd); + + // Membar to ensure fields initialization before leaving constructor. + Thread.MemoryBarrier(); + + // Start flusher after everything else is initialized. + _flusher = new Flusher<TK, TV>(thisRef); + + _flusher.RunThread(); + } + + /** <inheritDoc /> */ + public string CacheName + { + get { return _cacheName; } + } + + /** <inheritDoc /> */ + public bool AllowOverwrite + { + get + { + _rwLock.EnterReadLock(); + + try + { + ThrowIfDisposed(); + + return UU.DataStreamerAllowOverwriteGet(Target); + } + finally + { + _rwLock.ExitReadLock(); + } + } + set + { + _rwLock.EnterWriteLock(); + + try + { + ThrowIfDisposed(); + + UU.DataStreamerAllowOverwriteSet(Target, value); + } + finally + { + _rwLock.ExitWriteLock(); + } + } + } + + /** <inheritDoc /> */ + public bool SkipStore + { + get + { + _rwLock.EnterReadLock(); + + try + { + ThrowIfDisposed(); + + return UU.DataStreamerSkipStoreGet(Target); + } + finally + { + _rwLock.ExitReadLock(); + } + } + set + { + _rwLock.EnterWriteLock(); + + try + { + ThrowIfDisposed(); + + UU.DataStreamerSkipStoreSet(Target, value); + } + finally + { + _rwLock.ExitWriteLock(); + } + } + } + + /** <inheritDoc /> */ + public int PerNodeBufferSize + { + get + { + _rwLock.EnterReadLock(); + + try + { + ThrowIfDisposed(); + + return UU.DataStreamerPerNodeBufferSizeGet(Target); + } + finally + { + _rwLock.ExitReadLock(); + } + } + set + { + _rwLock.EnterWriteLock(); + + try + { + ThrowIfDisposed(); + + UU.DataStreamerPerNodeBufferSizeSet(Target, value); + + _bufSndSize = _topSize * value; + } + finally + { + _rwLock.ExitWriteLock(); + } + } + } + + /** <inheritDoc /> */ + public int PerNodeParallelOperations + { + get + { + _rwLock.EnterReadLock(); + + try + { + ThrowIfDisposed(); + + return UU.DataStreamerPerNodeParallelOperationsGet(Target); + } + finally + { + _rwLock.ExitReadLock(); + } + + } + set + { + _rwLock.EnterWriteLock(); + + try + { + ThrowIfDisposed(); + + UU.DataStreamerPerNodeParallelOperationsSet(Target, value); + } + finally + { + _rwLock.ExitWriteLock(); + } + + } + } + + /** <inheritDoc /> */ + public long AutoFlushFrequency + { + get + { + _rwLock.EnterReadLock(); + + try + { + ThrowIfDisposed(); + + return _flusher.Frequency; + } + finally + { + _rwLock.ExitReadLock(); + } + + } + set + { + _rwLock.EnterWriteLock(); + + try + { + ThrowIfDisposed(); + + _flusher.Frequency = value; + } + finally + { + _rwLock.ExitWriteLock(); + } + } + } + + /** <inheritDoc /> */ + public IFuture Future + { + get + { + ThrowIfDisposed(); + + return _closeFut; + } + } + + /** <inheritDoc /> */ + public IStreamReceiver<TK, TV> Receiver + { + get + { + ThrowIfDisposed(); + + return _rcv; + } + set + { + IgniteArgumentCheck.NotNull(value, "value"); + + var handleRegistry = Marshaller.Ignite.HandleRegistry; + + _rwLock.EnterWriteLock(); + + try + { + ThrowIfDisposed(); + + if (_rcv == value) + return; + + var rcvHolder = new StreamReceiverHolder(value, + (rec, grid, cache, stream, keepPortable) => + StreamReceiverHolder.InvokeReceiver((IStreamReceiver<TK, TV>) rec, grid, cache, stream, + keepPortable)); + + var rcvHnd0 = handleRegistry.Allocate(rcvHolder); + + try + { + DoOutOp(OpReceiver, w => + { + w.WriteLong(rcvHnd0); + + w.WriteObject(rcvHolder); + }); + } + catch (Exception) + { + handleRegistry.Release(rcvHnd0); + throw; + } + + if (_rcv != null) + handleRegistry.Release(_rcvHnd); + + _rcv = value; + _rcvHnd = rcvHnd0; + } + finally + { + _rwLock.ExitWriteLock(); + } + } + } + + /** <inheritDoc /> */ + public IFuture AddData(TK key, TV val) + { + ThrowIfDisposed(); + + IgniteArgumentCheck.NotNull(key, "key"); + + return Add0(new DataStreamerEntry<TK, TV>(key, val), 1); + } + + /** <inheritDoc /> */ + public IFuture AddData(KeyValuePair<TK, TV> pair) + { + ThrowIfDisposed(); + + return Add0(new DataStreamerEntry<TK, TV>(pair.Key, pair.Value), 1); + } + + /** <inheritDoc /> */ + public IFuture AddData(ICollection<KeyValuePair<TK, TV>> entries) + { + ThrowIfDisposed(); + + IgniteArgumentCheck.NotNull(entries, "entries"); + + return Add0(entries, entries.Count); + } + + /** <inheritDoc /> */ + public IFuture RemoveData(TK key) + { + ThrowIfDisposed(); + + IgniteArgumentCheck.NotNull(key, "key"); + + return Add0(new DataStreamerRemoveEntry<TK>(key), 1); + } + + /** <inheritDoc /> */ + public void TryFlush() + { + ThrowIfDisposed(); + + DataStreamerBatch<TK, TV> batch0 = _batch; + + if (batch0 != null) + Flush0(batch0, false, PlcFlush); + } + + /** <inheritDoc /> */ + public void Flush() + { + ThrowIfDisposed(); + + DataStreamerBatch<TK, TV> batch0 = _batch; + + if (batch0 != null) + Flush0(batch0, true, PlcFlush); + else + { + // Batch is null, i.e. data streamer is closing. Wait for close to complete. + _closedEvt.Wait(); + } + } + + /** <inheritDoc /> */ + public void Close(bool cancel) + { + _flusher.Stop(); + + while (true) + { + DataStreamerBatch<TK, TV> batch0 = _batch; + + if (batch0 == null) + { + // Wait for concurrent close to finish. + _closedEvt.Wait(); + + return; + } + + if (Flush0(batch0, true, cancel ? PlcCancelClose : PlcClose)) + { + _closeFut.OnDone(null, null); + + _rwLock.EnterWriteLock(); + + try + { + base.Dispose(true); + + if (_rcv != null) + Marshaller.Ignite.HandleRegistry.Release(_rcvHnd); + + _closedEvt.Set(); + } + finally + { + _rwLock.ExitWriteLock(); + } + + Marshaller.Ignite.HandleRegistry.Release(_hnd); + + break; + } + } + } + + /** <inheritDoc /> */ + public IDataStreamer<TK1, TV1> WithKeepPortable<TK1, TV1>() + { + if (_keepPortable) + { + var result = this as IDataStreamer<TK1, TV1>; + + if (result == null) + throw new InvalidOperationException( + "Can't change type of portable streamer. WithKeepPortable has been called on an instance of " + + "portable streamer with incompatible generic arguments."); + + return result; + } + + return new DataStreamerImpl<TK1, TV1>(UU.ProcessorDataStreamer(Marshaller.Ignite.InteropProcessor, + _cacheName, true), Marshaller, _cacheName, true); + } + + /** <inheritDoc /> */ + protected override void Dispose(bool disposing) + { + if (disposing) + Close(false); // Normal dispose: do not cancel + else + { + // Finalizer: just close Java streamer + try + { + if (_batch != null) + _batch.Send(this, PlcCancelClose); + } + catch (Exception) + { + // Finalizers should never throw + } + + Marshaller.Ignite.HandleRegistry.Release(_hnd, true); + Marshaller.Ignite.HandleRegistry.Release(_rcvHnd, true); + + base.Dispose(false); + } + } + + /** <inheritDoc /> */ + ~DataStreamerImpl() + { + Dispose(false); + } + + /** <inheritDoc /> */ + public void TopologyChange(long topVer, int topSize) + { + _rwLock.EnterWriteLock(); + + try + { + ThrowIfDisposed(); + + if (_topVer < topVer) + { + _topVer = topVer; + _topSize = topSize; + + _bufSndSize = topSize * UU.DataStreamerPerNodeBufferSizeGet(Target); + } + } + finally + { + _rwLock.ExitWriteLock(); + } + + } + + /// <summary> + /// Internal add/remove routine. + /// </summary> + /// <param name="val">Value.</param> + /// <param name="cnt">Items count.</param> + /// <returns>Future.</returns> + private IFuture Add0(object val, int cnt) + { + int bufSndSize0 = _bufSndSize; + + while (true) + { + var batch0 = _batch; + + if (batch0 == null) + throw new InvalidOperationException("Data streamer is stopped."); + + int size = batch0.Add(val, cnt); + + if (size == -1) + { + // Batch is blocked, perform CAS. + Interlocked.CompareExchange(ref _batch, + new DataStreamerBatch<TK, TV>(batch0), batch0); + + continue; + } + if (size >= bufSndSize0) + // Batch is too big, schedule flush. + Flush0(batch0, false, PlcContinue); + + return batch0.Future; + } + } + + /// <summary> + /// Internal flush routine. + /// </summary> + /// <param name="curBatch"></param> + /// <param name="wait">Whether to wait for flush to complete.</param> + /// <param name="plc">Whether this is the last batch.</param> + /// <returns>Whether this call was able to CAS previous batch</returns> + private bool Flush0(DataStreamerBatch<TK, TV> curBatch, bool wait, int plc) + { + // 1. Try setting new current batch to help further adders. + bool res = Interlocked.CompareExchange(ref _batch, + (plc == PlcContinue || plc == PlcFlush) ? + new DataStreamerBatch<TK, TV>(curBatch) : null, curBatch) == curBatch; + + // 2. Perform actual send. + curBatch.Send(this, plc); + + if (wait) + // 3. Wait for all futures to finish. + curBatch.AwaitCompletion(); + + return res; + } + + /// <summary> + /// Start write. + /// </summary> + /// <returns>Writer.</returns> + internal void Update(Action<PortableWriterImpl> action) + { + _rwLock.EnterReadLock(); + + try + { + ThrowIfDisposed(); + + DoOutOp(OpUpdate, action); + } + finally + { + _rwLock.ExitReadLock(); + } + } + + /// <summary> + /// Flusher. + /// </summary> + private class Flusher<TK1, TV1> + { + /** State: running. */ + private const int StateRunning = 0; + + /** State: stopping. */ + private const int StateStopping = 1; + + /** State: stopped. */ + private const int StateStopped = 2; + + /** Data streamer. */ + private readonly WeakReference _ldrRef; + + /** Finish flag. */ + private int _state; + + /** Flush frequency. */ + private long _freq; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="ldrRef">Data streamer weak reference..</param> + public Flusher(WeakReference ldrRef) + { + _ldrRef = ldrRef; + + lock (this) + { + _state = StateRunning; + } + } + + /// <summary> + /// Main flusher routine. + /// </summary> + private void Run() + { + bool force = false; + long curFreq = 0; + + try + { + while (true) + { + if (curFreq > 0 || force) + { + var ldr = _ldrRef.Target as DataStreamerImpl<TK1, TV1>; + + if (ldr == null) + return; + + ldr.TryFlush(); + + force = false; + } + + lock (this) + { + // Stop immediately. + if (_state == StateStopping) + return; + + if (curFreq == _freq) + { + // Frequency is unchanged + if (curFreq == 0) + // Just wait for a second and re-try. + Monitor.Wait(this, 1000); + else + { + // Calculate remaining time. + DateTime now = DateTime.Now; + + long ticks; + + try + { + ticks = now.AddMilliseconds(curFreq).Ticks - now.Ticks; + + if (ticks > int.MaxValue) + ticks = int.MaxValue; + } + catch (ArgumentOutOfRangeException) + { + // Handle possible overflow. + ticks = int.MaxValue; + } + + Monitor.Wait(this, TimeSpan.FromTicks(ticks)); + } + } + else + { + if (curFreq != 0) + force = true; + + curFreq = _freq; + } + } + } + } + finally + { + // Let streamer know about stop. + lock (this) + { + _state = StateStopped; + + Monitor.PulseAll(this); + } + } + } + + /// <summary> + /// Frequency. + /// </summary> + public long Frequency + { + get + { + return Interlocked.Read(ref _freq); + } + + set + { + lock (this) + { + if (_freq != value) + { + _freq = value; + + Monitor.PulseAll(this); + } + } + } + } + + /// <summary> + /// Stop flusher. + /// </summary> + public void Stop() + { + lock (this) + { + if (_state == StateRunning) + { + _state = StateStopping; + + Monitor.PulseAll(this); + } + + while (_state != StateStopped) + Monitor.Wait(this); + } + } + + /// <summary> + /// Runs the flusher thread. + /// </summary> + public void RunThread() + { + new Thread(Run).Start(); + } + } + +#pragma warning restore 0420 + + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs new file mode 100644 index 0000000..7e65934 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerRemoveEntry.cs @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Datastream +{ + /// <summary> + /// Remove marker. + /// </summary> + internal class DataStreamerRemoveEntry<TK> + { + /** Key to remove. */ + private readonly TK _key; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="key">Key.</param> + public DataStreamerRemoveEntry(TK key) + { + _key = key; + } + + /// <summary> + /// Key. + /// </summary> + public TK Key + { + get + { + return _key; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs new file mode 100644 index 0000000..5a7c104 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Datastream +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Datastream; + using Apache.Ignite.Core.Impl.Cache; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Portable wrapper for <see cref="IStreamReceiver{TK,TV}"/>. + /// </summary> + internal class StreamReceiverHolder : IPortableWriteAware + { + /** */ + private const byte RcvNormal = 0; + + /** */ + public const byte RcvTransformer = 1; + + /** Generic receiver. */ + private readonly object _rcv; + + /** Invoker delegate. */ + private readonly Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> _invoke; + + /// <summary> + /// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public StreamReceiverHolder(PortableReaderImpl reader) + { + var rcvType = reader.ReadByte(); + + _rcv = PortableUtils.ReadPortableOrSerializable<object>(reader); + + Debug.Assert(_rcv != null); + + var type = _rcv.GetType(); + + if (rcvType == RcvTransformer) + { + // rcv is a user ICacheEntryProcessor<K, V, A, R>, construct StreamTransformer from it. + // (we can't marshal StreamTransformer directly, because it is generic, + // and we do not know type arguments that user will have) + _rcv = DelegateTypeDescriptor.GetStreamTransformerCtor(type)(_rcv); + } + + _invoke = DelegateTypeDescriptor.GetStreamReceiver(_rcv.GetType()); + } + + /// <summary> + /// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class. + /// </summary> + /// <param name="rcv">Receiver.</param> + /// <param name="invoke">Invoke delegate.</param> + public StreamReceiverHolder(object rcv, + Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> invoke) + { + Debug.Assert(rcv != null); + Debug.Assert(invoke != null); + + _rcv = rcv; + _invoke = invoke; + } + + /** <inheritdoc /> */ + public void WritePortable(IPortableWriter writer) + { + var w = writer.RawWriter(); + + var writeAware = _rcv as IPortableWriteAware; + + if (writeAware != null) + writeAware.WritePortable(writer); + else + { + w.WriteByte(RcvNormal); + PortableUtils.WritePortableOrSerializable((PortableWriterImpl) writer, _rcv); + } + } + + /// <summary> + /// Updates cache with batch of entries. + /// </summary> + /// <param name="grid">The grid.</param> + /// <param name="cache">Cache.</param> + /// <param name="stream">Stream.</param> + /// <param name="keepPortable">Portable flag.</param> + public void Receive(Ignite grid, IUnmanagedTarget cache, IPortableStream stream, bool keepPortable) + { + Debug.Assert(grid != null); + Debug.Assert(cache != null); + Debug.Assert(stream != null); + + _invoke(_rcv, grid, cache, stream, keepPortable); + } + + /// <summary> + /// Invokes the receiver. + /// </summary> + /// <param name="receiver">Receiver.</param> + /// <param name="grid">Grid.</param> + /// <param name="cache">Cache.</param> + /// <param name="stream">Stream.</param> + /// <param name="keepPortable">Portable flag.</param> + public static void InvokeReceiver<TK, TV>(IStreamReceiver<TK, TV> receiver, Ignite grid, IUnmanagedTarget cache, + IPortableStream stream, bool keepPortable) + { + var reader = grid.Marshaller.StartUnmarshal(stream, keepPortable); + + var size = reader.ReadInt(); + + var entries = new List<ICacheEntry<TK, TV>>(size); + + for (var i = 0; i < size; i++) + entries.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>())); + + receiver.Receive(grid.Cache<TK, TV>(cache, keepPortable), entries); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs new file mode 100644 index 0000000..3972bb0 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Events +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Diagnostics.CodeAnalysis; + using System.Linq; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Events; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Handle; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Portable; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Ignite events. + /// </summary> + internal class Events : PlatformTarget, IEvents + { + /// <summary> + /// Opcodes. + /// </summary> + protected enum Op + { + RemoteQuery = 1, + RemoteListen = 2, + StopRemoteListen = 3, + WaitForLocal = 4, + LocalQuery = 5, + RecordLocal = 6, + EnableLocal = 8, + DisableLocal = 9, + GetEnabledEvents = 10 + } + + /** Map from user func to local wrapper, needed for invoke/unsubscribe. */ + private readonly Dictionary<object, Dictionary<int, LocalHandledEventFilter>> _localFilters + = new Dictionary<object, Dictionary<int, LocalHandledEventFilter>>(); + + /** Grid. */ + protected readonly Ignite Ignite; + + /// <summary> + /// Initializes a new instance of the <see cref="Events"/> class. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="clusterGroup">Cluster group.</param> + public Events(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup) + : base(target, marsh) + { + Debug.Assert(clusterGroup != null); + + ClusterGroup = clusterGroup; + + Ignite = (Ignite) clusterGroup.Ignite; + } + + /** <inheritDoc /> */ + public virtual IEvents WithAsync() + { + return new EventsAsync(UU.EventsWithAsync(Target), Marshaller, ClusterGroup); + } + + /** <inheritDoc /> */ + public virtual bool IsAsync + { + get { return false; } + } + + /** <inheritDoc /> */ + public virtual IFuture GetFuture() + { + throw IgniteUtils.GetAsyncModeDisabledException(); + } + + /** <inheritDoc /> */ + public virtual IFuture<TResult> GetFuture<TResult>() + { + throw IgniteUtils.GetAsyncModeDisabledException(); + } + + /** <inheritDoc /> */ + public IClusterGroup ClusterGroup { get; private set; } + + /** <inheritDoc /> */ + public virtual List<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types) + where T : IEvent + { + IgniteArgumentCheck.NotNull(filter, "filter"); + + return DoOutInOp((int) Op.RemoteQuery, + writer => + { + writer.Write(new PortableOrSerializableObjectHolder(filter)); + + writer.WriteLong((long) (timeout == null ? 0 : timeout.Value.TotalMilliseconds)); + + WriteEventTypes(types, writer); + }, + reader => ReadEvents<T>(reader)); + } + + /** <inheritDoc /> */ + public virtual Guid RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true, + IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types) + where T : IEvent + { + IgniteArgumentCheck.Ensure(bufSize > 0, "bufSize", "should be > 0"); + IgniteArgumentCheck.Ensure(interval == null || interval.Value.TotalMilliseconds > 0, "interval", "should be null or >= 0"); + + return DoOutInOp((int) Op.RemoteListen, + writer => + { + writer.WriteInt(bufSize); + writer.WriteLong((long) (interval == null ? 0 : interval.Value.TotalMilliseconds)); + writer.WriteBoolean(autoUnsubscribe); + + writer.WriteBoolean(localListener != null); + + if (localListener != null) + { + var listener = new RemoteListenEventFilter(Ignite, (id, e) => localListener.Invoke(id, (T) e)); + writer.WriteLong(Ignite.HandleRegistry.Allocate(listener)); + } + + writer.WriteBoolean(remoteFilter != null); + + if (remoteFilter != null) + writer.Write(new PortableOrSerializableObjectHolder(remoteFilter)); + + WriteEventTypes(types, writer); + }, + reader => Marshaller.StartUnmarshal(reader).ReadGuid() ?? Guid.Empty); + } + + /** <inheritDoc /> */ + public virtual void StopRemoteListen(Guid opId) + { + DoOutOp((int) Op.StopRemoteListen, writer => + { + Marshaller.StartMarshal(writer).WriteGuid(opId); + }); + } + + /** <inheritDoc /> */ + public IEvent WaitForLocal(params int[] types) + { + return WaitForLocal<IEvent>(null, types); + } + + /** <inheritDoc /> */ + public virtual T WaitForLocal<T>(IEventFilter<T> filter, params int[] types) where T : IEvent + { + long hnd = 0; + + try + { + return WaitForLocal0(filter, ref hnd, types); + } + finally + { + if (filter != null) + Ignite.HandleRegistry.Release(hnd); + } + } + + /** <inheritDoc /> */ + public List<IEvent> LocalQuery(params int[] types) + { + return DoOutInOp((int) Op.LocalQuery, + writer => WriteEventTypes(types, writer), + reader => ReadEvents<IEvent>(reader)); + } + + /** <inheritDoc /> */ + public void RecordLocal(IEvent evt) + { + throw new NotImplementedException("GG-10244"); + } + + /** <inheritDoc /> */ + public void LocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent + { + IgniteArgumentCheck.NotNull(listener, "listener"); + IgniteArgumentCheck.NotNullOrEmpty(types, "types"); + + foreach (var type in types) + LocalListen(listener, type); + } + + /** <inheritDoc /> */ + public bool StopLocalListen<T>(IEventFilter<T> listener, params int[] types) where T : IEvent + { + lock (_localFilters) + { + Dictionary<int, LocalHandledEventFilter> filters; + + if (!_localFilters.TryGetValue(listener, out filters)) + return false; + + var success = false; + + // Should do this inside lock to avoid race with subscription + // ToArray is required because we are going to modify underlying dictionary during enumeration + foreach (var filter in GetLocalFilters(listener, types).ToArray()) + success |= UU.EventsStopLocalListen(Target, filter.Handle); + + return success; + } + } + + /** <inheritDoc /> */ + public void EnableLocal(params int[] types) + { + IgniteArgumentCheck.NotNullOrEmpty(types, "types"); + + DoOutOp((int)Op.EnableLocal, writer => WriteEventTypes(types, writer)); + } + + /** <inheritDoc /> */ + public void DisableLocal(params int[] types) + { + IgniteArgumentCheck.NotNullOrEmpty(types, "types"); + + DoOutOp((int)Op.DisableLocal, writer => WriteEventTypes(types, writer)); + } + + /** <inheritDoc /> */ + public int[] GetEnabledEvents() + { + return DoInOp((int)Op.GetEnabledEvents, reader => ReadEventTypes(reader)); + } + + /** <inheritDoc /> */ + public bool IsEnabled(int type) + { + return UU.EventsIsEnabled(Target, type); + } + + /// <summary> + /// Waits for the specified events. + /// </summary> + /// <typeparam name="T">Type of events.</typeparam> + /// <param name="filter">Optional filtering predicate. Event wait will end as soon as it returns false.</param> + /// <param name="handle">The filter handle, if applicable.</param> + /// <param name="types">Types of the events to wait for. + /// If not provided, all events will be passed to the filter.</param> + /// <returns>Ignite event.</returns> + protected T WaitForLocal0<T>(IEventFilter<T> filter, ref long handle, params int[] types) where T : IEvent + { + if (filter != null) + handle = Ignite.HandleRegistry.Allocate(new LocalEventFilter + { + InvokeFunc = stream => InvokeLocalFilter(stream, filter) + }); + + var hnd = handle; + + return DoOutInOp((int)Op.WaitForLocal, + writer => + { + if (filter != null) + { + writer.WriteBoolean(true); + writer.WriteLong(hnd); + } + else + writer.WriteBoolean(false); + + WriteEventTypes(types, writer); + }, + reader => EventReader.Read<T>(Marshaller.StartUnmarshal(reader))); + } + + /// <summary> + /// Reads events from a portable stream. + /// </summary> + /// <typeparam name="T">Event type.</typeparam> + /// <param name="reader">Reader.</param> + /// <returns>Resulting list or null.</returns> + private List<T> ReadEvents<T>(IPortableStream reader) where T : IEvent + { + return ReadEvents<T>(Marshaller.StartUnmarshal(reader)); + } + + /// <summary> + /// Reads events from a portable reader. + /// </summary> + /// <typeparam name="T">Event type.</typeparam> + /// <param name="portableReader">Reader.</param> + /// <returns>Resulting list or null.</returns> + protected static List<T> ReadEvents<T>(PortableReaderImpl portableReader) where T : IEvent + { + var count = portableReader.RawReader().ReadInt(); + + if (count == -1) + return null; + + var result = new List<T>(count); + + for (var i = 0; i < count; i++) + result.Add(EventReader.Read<T>(portableReader)); + + return result; + } + + /// <summary> + /// Gets local filters by user listener and event type. + /// </summary> + /// <param name="listener">Listener.</param> + /// <param name="types">Types.</param> + /// <returns>Collection of local listener wrappers.</returns> + [SuppressMessage("ReSharper", "InconsistentlySynchronizedField", + Justification = "This private method should be always called within a lock on localFilters")] + private IEnumerable<LocalHandledEventFilter> GetLocalFilters(object listener, int[] types) + { + Dictionary<int, LocalHandledEventFilter> filters; + + if (!_localFilters.TryGetValue(listener, out filters)) + return Enumerable.Empty<LocalHandledEventFilter>(); + + if (types.Length == 0) + return filters.Values; + + return types.Select(type => + { + LocalHandledEventFilter filter; + + return filters.TryGetValue(type, out filter) ? filter : null; + }).Where(x => x != null); + } + + /// <summary> + /// Adds an event listener for local events. + /// </summary> + /// <typeparam name="T">Type of events.</typeparam> + /// <param name="listener">Predicate that is called on each received event.</param> + /// <param name="type">Event type for which this listener will be notified</param> + private void LocalListen<T>(IEventFilter<T> listener, int type) where T : IEvent + { + lock (_localFilters) + { + Dictionary<int, LocalHandledEventFilter> filters; + + if (!_localFilters.TryGetValue(listener, out filters)) + { + filters = new Dictionary<int, LocalHandledEventFilter>(); + + _localFilters[listener] = filters; + } + + LocalHandledEventFilter localFilter; + + if (!filters.TryGetValue(type, out localFilter)) + { + localFilter = CreateLocalFilter(listener, type); + + filters[type] = localFilter; + } + + UU.EventsLocalListen(Target, localFilter.Handle, type); + } + } + + /// <summary> + /// Creates a user filter wrapper. + /// </summary> + /// <typeparam name="T">Event object type.</typeparam> + /// <param name="listener">Listener.</param> + /// <param name="type">Event type.</param> + /// <returns>Created wrapper.</returns> + private LocalHandledEventFilter CreateLocalFilter<T>(IEventFilter<T> listener, int type) where T : IEvent + { + var result = new LocalHandledEventFilter( + stream => InvokeLocalFilter(stream, listener), + unused => + { + lock (_localFilters) + { + Dictionary<int, LocalHandledEventFilter> filters; + + if (_localFilters.TryGetValue(listener, out filters)) + { + filters.Remove(type); + + if (filters.Count == 0) + _localFilters.Remove(listener); + } + } + }); + + result.Handle = Ignite.HandleRegistry.Allocate(result); + + return result; + } + + /// <summary> + /// Invokes local filter using data from specified stream. + /// </summary> + /// <typeparam name="T">Event object type.</typeparam> + /// <param name="stream">The stream.</param> + /// <param name="listener">The listener.</param> + /// <returns>Filter invocation result.</returns> + private bool InvokeLocalFilter<T>(IPortableStream stream, IEventFilter<T> listener) where T : IEvent + { + var evt = EventReader.Read<T>(Marshaller.StartUnmarshal(stream)); + + // No guid in local mode + return listener.Invoke(Guid.Empty, evt); + } + + /// <summary> + /// Writes the event types. + /// </summary> + /// <param name="types">Types.</param> + /// <param name="writer">Writer.</param> + private static void WriteEventTypes(int[] types, IPortableRawWriter writer) + { + if (types.Length == 0) + types = null; // empty array means no type filtering + + writer.WriteIntArray(types); + } + + /// <summary> + /// Writes the event types. + /// </summary> + /// <param name="reader">Reader.</param> + private int[] ReadEventTypes(IPortableStream reader) + { + return Marshaller.StartUnmarshal(reader).ReadIntArray(); + } + + /// <summary> + /// Local user filter wrapper. + /// </summary> + private class LocalEventFilter : IInteropCallback + { + /** */ + public Func<IPortableStream, bool> InvokeFunc; + + /** <inheritdoc /> */ + public int Invoke(IPortableStream stream) + { + return InvokeFunc(stream) ? 1 : 0; + } + } + + /// <summary> + /// Local user filter wrapper with handle. + /// </summary> + private class LocalHandledEventFilter : Handle<Func<IPortableStream, bool>>, IInteropCallback + { + /** */ + public long Handle; + + /** <inheritdoc /> */ + public int Invoke(IPortableStream stream) + { + return Target(stream) ? 1 : 0; + } + + /// <summary> + /// Initializes a new instance of the <see cref="LocalHandledEventFilter"/> class. + /// </summary> + /// <param name="invokeFunc">The invoke function.</param> + /// <param name="releaseAction">The release action.</param> + public LocalHandledEventFilter( + Func<IPortableStream, bool> invokeFunc, Action<Func<IPortableStream, bool>> releaseAction) + : base(invokeFunc, releaseAction) + { + // No-op. + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs new file mode 100644 index 0000000..632d8b8 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/EventsAsync.cs @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Events +{ + using System; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using System.Threading; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Events; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Unmanaged; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Async Ignite events. + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] + internal class EventsAsync : Events + { + /** */ + private readonly ThreadLocal<int> _lastAsyncOp = new ThreadLocal<int>(() => OpNone); + + /** */ + private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>(); + + /// <summary> + /// Initializes a new instance of the <see cref="Events"/> class. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="clusterGroup">Cluster group.</param> + public EventsAsync(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup) + : base(target, marsh, clusterGroup) + { + // No-op. + } + + /** <inheritdoc /> */ + public override List<T> RemoteQuery<T>(IEventFilter<T> filter, TimeSpan? timeout = null, params int[] types) + { + _lastAsyncOp.Value = (int) Op.RemoteQuery; + + var result = base.RemoteQuery(filter, timeout, types); + + // Result is a List<T> so we can't create proper converter later in GetFuture call from user. + // ReSharper disable once RedundantTypeArgumentsOfMethod (otherwise won't compile in VS2010 / TC) + _curFut.Value = GetFuture<List<T>>((futId, futTyp) => UU.TargetListenFutureForOperation(Target, futId, futTyp, + (int) Op.RemoteQuery), convertFunc: ReadEvents<T>); + + return result; + } + + /** <inheritdoc /> */ + public override Guid RemoteListen<T>(int bufSize = 1, TimeSpan? interval = null, bool autoUnsubscribe = true, + IEventFilter<T> localListener = null, IEventFilter<T> remoteFilter = null, params int[] types) + { + _lastAsyncOp.Value = (int) Op.RemoteListen; + _curFut.Value = null; + + return base.RemoteListen(bufSize, interval, autoUnsubscribe, localListener, remoteFilter, types); + } + + /** <inheritdoc /> */ + public override void StopRemoteListen(Guid opId) + { + _lastAsyncOp.Value = (int) Op.StopRemoteListen; + _curFut.Value = null; + + base.StopRemoteListen(opId); + } + + /** <inheritdoc /> */ + public override T WaitForLocal<T>(IEventFilter<T> filter, params int[] types) + { + _lastAsyncOp.Value = (int) Op.WaitForLocal; + + long hnd = 0; + + try + { + var result = WaitForLocal0(filter, ref hnd, types); + + if (filter != null) + { + // Dispose handle as soon as future ends. + var fut = GetFuture<T>(); + + _curFut.Value = fut; + + fut.Listen(() => Ignite.HandleRegistry.Release(hnd)); + } + else + _curFut.Value = null; + + return result; + } + catch (Exception) + { + Ignite.HandleRegistry.Release(hnd); + throw; + } + } + + /** <inheritdoc /> */ + public override IEvents WithAsync() + { + return this; + } + + /** <inheritdoc /> */ + public override bool IsAsync + { + get { return true; } + } + + /** <inheritdoc /> */ + public override IFuture GetFuture() + { + return GetFuture<object>(); + } + + /** <inheritdoc /> */ + public override IFuture<T> GetFuture<T>() + { + if (_curFut.Value != null) + { + var fut = _curFut.Value; + _curFut.Value = null; + return (IFuture<T>) fut; + } + + Func<PortableReaderImpl, T> converter = null; + + if (_lastAsyncOp.Value == (int) Op.WaitForLocal) + converter = reader => (T) EventReader.Read<IEvent>(reader); + + return GetFuture((futId, futTyp) => UU.TargetListenFutureForOperation(Target, futId, futTyp, _lastAsyncOp.Value), + convertFunc: converter); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs new file mode 100644 index 0000000..8b44966 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Events/RemoteListenEventFilter.cs @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Events +{ + using System; + using System.Diagnostics; + using Apache.Ignite.Core.Events; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + + /// <summary> + /// Event filter/listener holder for RemoteListen. + /// </summary> + internal class RemoteListenEventFilter : IInteropCallback + { + /** */ + private readonly Ignite _ignite; + + /** */ + private readonly Func<Guid, IEvent, bool> _filter; + + /// <summary> + /// Initializes a new instance of the <see cref="RemoteListenEventFilter"/> class. + /// </summary> + /// <param name="ignite">The grid.</param> + /// <param name="filter">The filter.</param> + public RemoteListenEventFilter(Ignite ignite, Func<Guid, IEvent, bool> filter) + { + _ignite = ignite; + _filter = filter; + } + + /** <inheritdoc /> */ + public int Invoke(IPortableStream stream) + { + var reader = _ignite.Marshaller.StartUnmarshal(stream); + + var evt = EventReader.Read<IEvent>(reader); + + var nodeId = reader.ReadGuid() ?? Guid.Empty; + + return _filter(nodeId, evt) ? 1 : 0; + } + + /// <summary> + /// Creates an instance of this class from a stream. + /// </summary> + /// <param name="memPtr">Memory pointer.</param> + /// <param name="grid">Grid</param> + /// <returns>Deserialized instance of <see cref="RemoteListenEventFilter"/></returns> + public static RemoteListenEventFilter CreateInstance(long memPtr, Ignite grid) + { + Debug.Assert(grid != null); + + using (var stream = IgniteManager.Memory.Get(memPtr).Stream()) + { + var marsh = grid.Marshaller; + + var reader = marsh.StartUnmarshal(stream); + + var pred = reader.ReadObject<PortableOrSerializableObjectHolder>().Item; + + var func = DelegateTypeDescriptor.GetEventFilter(pred.GetType()); + + return new RemoteListenEventFilter(grid, (id, evt) => func(pred, id, evt)); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs new file mode 100644 index 0000000..066f345 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Runtime.InteropServices; + using System.Security; + using System.Threading; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Store; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Transactions; + + /// <summary> + /// Managed environment. Acts as a gateway for native code. + /// </summary> + [StructLayout(LayoutKind.Sequential)] + internal static class ExceptionUtils + { + /** NoClassDefFoundError fully-qualified class name which is important during startup phase. */ + private const string ClsNoClsDefFoundErr = "java.lang.NoClassDefFoundError"; + + /** NoSuchMethodError fully-qualified class name which is important during startup phase. */ + private const string ClsNoSuchMthdErr = "java.lang.NoSuchMethodError"; + + /** InteropCachePartialUpdateException. */ + private const string ClsCachePartialUpdateErr = "org.apache.ignite.internal.processors.platform.cache.PlatformCachePartialUpdateException"; + + /** Map with predefined exceptions. */ + private static readonly IDictionary<string, ExceptionFactoryDelegate> EXS = new Dictionary<string, ExceptionFactoryDelegate>(); + + /** Exception factory delegate. */ + private delegate Exception ExceptionFactoryDelegate(string msg); + + /// <summary> + /// Static initializer. + /// </summary> + static ExceptionUtils() + { + // Common Java exceptions mapped to common .Net exceptions. + EXS["java.lang.IllegalArgumentException"] = m => new ArgumentException(m); + EXS["java.lang.IllegalStateException"] = m => new InvalidOperationException(m); + EXS["java.lang.UnsupportedOperationException"] = m => new NotImplementedException(m); + EXS["java.lang.InterruptedException"] = m => new ThreadInterruptedException(m); + + // Generic Ignite exceptions. + EXS["org.apache.ignite.IgniteException"] = m => new IgniteException(m); + EXS["org.apache.ignite.IgniteCheckedException"] = m => new IgniteException(m); + + // Cluster exceptions. + EXS["org.apache.ignite.cluster.ClusterGroupEmptyException"] = m => new ClusterGroupEmptyException(m); + EXS["org.apache.ignite.cluster.ClusterTopologyException"] = m => new ClusterTopologyException(m); + + // Compute exceptions. + EXS["org.apache.ignite.compute.ComputeExecutionRejectedException"] = m => new ComputeExecutionRejectedException(m); + EXS["org.apache.ignite.compute.ComputeJobFailoverException"] = m => new ComputeJobFailoverException(m); + EXS["org.apache.ignite.compute.ComputeTaskCancelledException"] = m => new ComputeTaskCancelledException(m); + EXS["org.apache.ignite.compute.ComputeTaskTimeoutException"] = m => new ComputeTaskTimeoutException(m); + EXS["org.apache.ignite.compute.ComputeUserUndeclaredException"] = m => new ComputeUserUndeclaredException(m); + + // Cache exceptions. + EXS["javax.cache.CacheException"] = m => new CacheException(m); + EXS["javax.cache.integration.CacheLoaderException"] = m => new CacheStoreException(m); + EXS["javax.cache.integration.CacheWriterException"] = m => new CacheStoreException(m); + EXS["javax.cache.processor.EntryProcessorException"] = m => new CacheEntryProcessorException(m); + EXS["org.apache.ignite.cache.CacheAtomicUpdateTimeoutException"] = m => new CacheAtomicUpdateTimeoutException(m); + + // Transaction exceptions. + EXS["org.apache.ignite.transactions.TransactionOptimisticException"] = m => new TransactionOptimisticException(m); + EXS["org.apache.ignite.transactions.TransactionTimeoutException"] = m => new TransactionTimeoutException(m); + EXS["org.apache.ignite.transactions.TransactionRollbackException"] = m => new TransactionRollbackException(m); + EXS["org.apache.ignite.transactions.TransactionHeuristicException"] = m => new TransactionHeuristicException(m); + + // Security exceptions. + EXS["org.apache.ignite.IgniteAuthenticationException"] = m => new SecurityException(m); + EXS["org.apache.ignite.plugin.security.GridSecurityException"] = m => new SecurityException(m); + } + + /// <summary> + /// Creates exception according to native code class and message. + /// </summary> + /// <param name="clsName">Exception class name.</param> + /// <param name="msg">Exception message.</param> + /// <param name="reader">Error data reader.</param> + public static Exception GetException(string clsName, string msg, PortableReaderImpl reader = null) + { + ExceptionFactoryDelegate ctor; + + if (EXS.TryGetValue(clsName, out ctor)) + return ctor(msg); + + if (ClsNoClsDefFoundErr.Equals(clsName)) + return new IgniteException("Java class is not found (did you set IGNITE_HOME environment " + + "variable?): " + msg); + + if (ClsNoSuchMthdErr.Equals(clsName)) + return new IgniteException("Java class method is not found (did you set IGNITE_HOME environment " + + "variable?): " + msg); + + if (ClsCachePartialUpdateErr.Equals(clsName)) + return ProcessCachePartialUpdateException(msg, reader); + + return new IgniteException("Java exception occurred [class=" + clsName + ", message=" + msg + ']'); + } + + /// <summary> + /// Process cache partial update exception. + /// </summary> + /// <param name="msg">Message.</param> + /// <param name="reader">Reader.</param> + /// <returns></returns> + private static Exception ProcessCachePartialUpdateException(string msg, PortableReaderImpl reader) + { + if (reader == null) + return new CachePartialUpdateException(msg, new IgniteException("Failed keys are not available.")); + + bool dataExists = reader.ReadBoolean(); + + Debug.Assert(dataExists); + + if (reader.ReadBoolean()) + { + bool keepPortable = reader.ReadBoolean(); + + PortableReaderImpl keysReader = reader.Marshaller.StartUnmarshal(reader.Stream, keepPortable); + + try + { + return new CachePartialUpdateException(msg, ReadNullableList(keysReader)); + } + catch (Exception e) + { + // Failed to deserialize data. + return new CachePartialUpdateException(msg, e); + } + } + + // Was not able to write keys. + string innerErrCls = reader.ReadString(); + string innerErrMsg = reader.ReadString(); + + Exception innerErr = GetException(innerErrCls, innerErrMsg); + + return new CachePartialUpdateException(msg, innerErr); + } + + /// <summary> + /// Create JVM initialization exception. + /// </summary> + /// <param name="clsName">Class name.</param> + /// <param name="msg">Message.</param> + /// <returns>Exception.</returns> + public static Exception GetJvmInitializeException(string clsName, string msg) + { + if (clsName != null) + return new IgniteException("Failed to initialize JVM.", GetException(clsName, msg)); + + if (msg != null) + return new IgniteException("Failed to initialize JVM: " + msg); + + return new IgniteException("Failed to initialize JVM."); + } + + /// <summary> + /// Reads nullable list. + /// </summary> + /// <param name="reader">Reader.</param> + /// <returns>List.</returns> + private static List<object> ReadNullableList(PortableReaderImpl reader) + { + if (!reader.ReadBoolean()) + return null; + + var size = reader.ReadInt(); + + var list = new List<object>(size); + + for (int i = 0; i < size; i++) + list.Add(reader.ReadObject<object>()); + + return list; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs index 2a67c41..9c8178f 100644 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Handle/HandleRegistry.cs @@ -253,7 +253,7 @@ namespace Apache.Ignite.Core.Impl.Handle } if (throwOnAbsent) - throw new InvalidOperationException("Resource handle has been released (is grid stopping?)."); + throw new InvalidOperationException("Resource handle has been released (is Ignite stopping?)."); return default(T); } @@ -333,7 +333,7 @@ namespace Apache.Ignite.Core.Impl.Handle /// <returns>Exception.</returns> private static Exception ClosedException() { - return new InvalidOperationException("Cannot allocate a resource handle because grid is stopping."); + return new InvalidOperationException("Cannot allocate a resource handle because Ignite is stopping."); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs new file mode 100644 index 0000000..91838d0 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/IInteropCallback.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl +{ + using Apache.Ignite.Core.Impl.Portable.IO; + + /// <summary> + /// Interop callback. + /// </summary> + internal interface IInteropCallback + { + /// <summary> + /// Invokes callback. + /// </summary> + /// <param name="stream">Stream.</param> + /// <returns>Invocation result.</returns> + int Invoke(IPortableStream stream); + } +} \ No newline at end of file
