http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs deleted file mode 100644 index 5738ed9..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous -{ - using Apache.Ignite.Core.Cache.Event; - using Apache.Ignite.Core.Impl.Portable.IO; - using Apache.Ignite.Core.Impl.Resource; - using CQU = ContinuousQueryUtils; - - /// <summary> - /// Continuous query filter interface. Required to hide generic nature of underliyng real filter. - /// </summary> - internal interface IContinuousQueryFilter - { - /// <summary> - /// Evaluate filter. - /// </summary> - /// <param name="stream">Stream.</param> - /// <returns>Result.</returns> - bool Evaluate(IPortableStream stream); - - /// <summary> - /// Inject grid. - /// </summary> - /// <param name="grid"></param> - void Inject(Ignite grid); - - /// <summary> - /// Allocate handle for the filter. - /// </summary> - /// <returns></returns> - long Allocate(); - - /// <summary> - /// Release filter. - /// </summary> - void Release(); - } - - /// <summary> - /// Continuous query filter generic implementation. - /// </summary> - internal class ContinuousQueryFilter<TK, TV> : IContinuousQueryFilter - { - /** Actual filter. */ - private readonly ICacheEntryEventFilter<TK, TV> _filter; - - /** Keep portable flag. */ - private readonly bool _keepPortable; - - /** Ignite hosting the filter. */ - private volatile Ignite _ignite; - - /** GC handle. */ - private long? _hnd; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="filter">Actual filter.</param> - /// <param name="keepPortable">Keep portable flag.</param> - public ContinuousQueryFilter(ICacheEntryEventFilter<TK, TV> filter, bool keepPortable) - { - _filter = filter; - _keepPortable = keepPortable; - } - - /** <inheritDoc /> */ - public bool Evaluate(IPortableStream stream) - { - ICacheEntryEvent<TK, TV> evt = CQU.ReadEvent<TK, TV>(stream, _ignite.Marshaller, _keepPortable); - - return _filter.Evaluate(evt); - } - - /** <inheritDoc /> */ - public void Inject(Ignite grid) - { - _ignite = grid; - - ResourceProcessor.Inject(_filter, grid); - } - - /** <inheritDoc /> */ - public long Allocate() - { - lock (this) - { - if (!_hnd.HasValue) - _hnd = _ignite.HandleRegistry.Allocate(this); - - return _hnd.Value; - } - } - - /** <inheritDoc /> */ - public void Release() - { - lock (this) - { - if (_hnd.HasValue) - { - _ignite.HandleRegistry.Release(_hnd.Value); - - _hnd = null; - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs deleted file mode 100644 index 65da674..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous -{ - using System; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Portable; - - /// <summary> - /// Continuous query remote filter holder. Wraps real filter into portable object, - /// so that it can be passed over wire to another node. - /// </summary> - public class ContinuousQueryFilterHolder : IPortableWriteAware - { - /** Key type. */ - private readonly Type _keyTyp; - - /** Value type. */ - private readonly Type _valTyp; - - /** Filter object. */ - private readonly object _filter; - - /** Keep portable flag. */ - private readonly bool _keepPortable; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="keyTyp">Key type.</param> - /// <param name="valTyp">Value type.</param> - /// <param name="filter">Filter.</param> - /// <param name="keepPortable">Keep portable flag.</param> - public ContinuousQueryFilterHolder(Type keyTyp, Type valTyp, object filter, bool keepPortable) - { - _keyTyp = keyTyp; - _valTyp = valTyp; - _filter = filter; - _keepPortable = keepPortable; - } - - /// <summary> - /// Key type. - /// </summary> - internal Type KeyType - { - get { return _keyTyp; } - } - - /// <summary> - /// Value type. - /// </summary> - internal Type ValueType - { - get { return _valTyp; } - } - - /// <summary> - /// Filter. - /// </summary> - internal object Filter - { - get { return _filter; } - } - - /// <summary> - /// Keep portable flag. - /// </summary> - internal bool KeepPortable - { - get { return _keepPortable; } - } - - /// <summary> - /// Writes this object to the given writer. - /// </summary> - /// <param name="writer">Writer.</param> - public void WritePortable(IPortableWriter writer) - { - PortableWriterImpl rawWriter = (PortableWriterImpl) writer.RawWriter(); - - PortableUtils.WritePortableOrSerializable(rawWriter, _keyTyp); - PortableUtils.WritePortableOrSerializable(rawWriter, _valTyp); - PortableUtils.WritePortableOrSerializable(rawWriter, _filter); - - rawWriter.WriteBoolean(_keepPortable); - } - - /// <summary> - /// Initializes a new instance of the <see cref="ContinuousQueryFilterHolder"/> class. - /// </summary> - /// <param name="reader">The reader.</param> - public ContinuousQueryFilterHolder(IPortableReader reader) - { - PortableReaderImpl rawReader = (PortableReaderImpl) reader.RawReader(); - - _keyTyp = PortableUtils.ReadPortableOrSerializable<Type>(rawReader); - _valTyp = PortableUtils.ReadPortableOrSerializable<Type>(rawReader); - _filter = PortableUtils.ReadPortableOrSerializable<object>(rawReader); - _keepPortable = rawReader.ReadBoolean(); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs deleted file mode 100644 index d8d014b..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous -{ - using System; - using System.Diagnostics; - using Apache.Ignite.Core.Cache; - using Apache.Ignite.Core.Cache.Event; - using Apache.Ignite.Core.Cache.Query; - using Apache.Ignite.Core.Cache.Query.Continuous; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Portable.IO; - using Apache.Ignite.Core.Impl.Resource; - using Apache.Ignite.Core.Impl.Unmanaged; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; - using CQU = ContinuousQueryUtils; - - /// <summary> - /// Continuous query handle interface. - /// </summary> - internal interface IContinuousQueryHandleImpl : IDisposable - { - /// <summary> - /// Process callback. - /// </summary> - /// <param name="stream">Stream.</param> - /// <returns>Result.</returns> - void Apply(IPortableStream stream); - } - - /// <summary> - /// Continuous query handle. - /// </summary> - internal class ContinuousQueryHandleImpl<TK, TV> : IContinuousQueryHandleImpl, IContinuousQueryFilter, - IContinuousQueryHandle<ICacheEntry<TK, TV>> - { - /** Marshaller. */ - private readonly PortableMarshaller _marsh; - - /** Keep portable flag. */ - private readonly bool _keepPortable; - - /** Real listener. */ - private readonly ICacheEntryEventListener<TK, TV> _lsnr; - - /** Real filter. */ - private readonly ICacheEntryEventFilter<TK, TV> _filter; - - /** GC handle. */ - private long _hnd; - - /** Native query. */ - private volatile IUnmanagedTarget _nativeQry; - - /** Initial query cursor. */ - private volatile IQueryCursor<ICacheEntry<TK, TV>> _initialQueryCursor; - - /** Disposed flag. */ - private bool _disposed; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="qry">Query.</param> - /// <param name="marsh">Marshaller.</param> - /// <param name="keepPortable">Keep portable flag.</param> - public ContinuousQueryHandleImpl(ContinuousQuery<TK, TV> qry, PortableMarshaller marsh, bool keepPortable) - { - _marsh = marsh; - _keepPortable = keepPortable; - - _lsnr = qry.Listener; - _filter = qry.Filter; - } - - /// <summary> - /// Start execution. - /// </summary> - /// <param name="grid">Ignite instance.</param> - /// <param name="writer">Writer.</param> - /// <param name="cb">Callback invoked when all necessary data is written to stream.</param> - /// <param name="qry">Query.</param> - public void Start(Ignite grid, PortableWriterImpl writer, Func<IUnmanagedTarget> cb, - ContinuousQuery<TK, TV> qry) - { - // 1. Inject resources. - ResourceProcessor.Inject(_lsnr, grid); - ResourceProcessor.Inject(_filter, grid); - - // 2. Allocate handle. - _hnd = grid.HandleRegistry.Allocate(this); - - // 3. Write data to stream. - writer.WriteLong(_hnd); - writer.WriteBoolean(qry.Local); - writer.WriteBoolean(_filter != null); - - ContinuousQueryFilterHolder filterHolder = _filter == null || qry.Local ? null : - new ContinuousQueryFilterHolder(typeof (TK), typeof (TV), _filter, _keepPortable); - - writer.WriteObject(filterHolder); - - writer.WriteInt(qry.BufferSize); - writer.WriteLong((long)qry.TimeInterval.TotalMilliseconds); - writer.WriteBoolean(qry.AutoUnsubscribe); - - // 4. Call Java. - _nativeQry = cb(); - - // 5. Initial query. - var nativeInitialQryCur = UU.ContinuousQueryGetInitialQueryCursor(_nativeQry); - _initialQueryCursor = nativeInitialQryCur == null - ? null - : new QueryCursor<TK, TV>(nativeInitialQryCur, _marsh, _keepPortable); - } - - /** <inheritdoc /> */ - public void Apply(IPortableStream stream) - { - ICacheEntryEvent<TK, TV>[] evts = CQU.ReadEvents<TK, TV>(stream, _marsh, _keepPortable); - - _lsnr.OnEvent(evts); - } - - /** <inheritdoc /> */ - public bool Evaluate(IPortableStream stream) - { - Debug.Assert(_filter != null, "Evaluate should not be called if filter is not set."); - - ICacheEntryEvent<TK, TV> evt = CQU.ReadEvent<TK, TV>(stream, _marsh, _keepPortable); - - return _filter.Evaluate(evt); - } - - /** <inheritdoc /> */ - public void Inject(Ignite grid) - { - throw new NotSupportedException("Should not be called."); - } - - /** <inheritdoc /> */ - public long Allocate() - { - throw new NotSupportedException("Should not be called."); - } - - /** <inheritdoc /> */ - public void Release() - { - _marsh.Ignite.HandleRegistry.Release(_hnd); - } - - /** <inheritdoc /> */ - public IQueryCursor<ICacheEntry<TK, TV>> GetInitialQueryCursor() - { - lock (this) - { - if (_disposed) - throw new ObjectDisposedException("Continuous query handle has been disposed."); - - var cur = _initialQueryCursor; - - if (cur == null) - throw new InvalidOperationException("GetInitialQueryCursor() can be called only once."); - - _initialQueryCursor = null; - - return cur; - } - } - - /** <inheritdoc /> */ - public void Dispose() - { - lock (this) - { - if (_disposed) - return; - - Debug.Assert(_nativeQry != null); - - try - { - UU.ContinuousQueryClose(_nativeQry); - } - finally - { - _nativeQry.Dispose(); - - _disposed = true; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs deleted file mode 100644 index 86c8300..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous -{ - using System.Diagnostics; - using System.Diagnostics.CodeAnalysis; - using Apache.Ignite.Core.Cache.Event; - using Apache.Ignite.Core.Impl.Cache.Event; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Portable.IO; - - /// <summary> - /// Utility methods for continuous queries. - /// </summary> - static class ContinuousQueryUtils - { - /// <summary> - /// Read single event. - /// </summary> - /// <param name="stream">Stream to read data from.</param> - /// <param name="marsh">Marshaller.</param> - /// <param name="keepPortable">Keep portable flag.</param> - /// <returns>Event.</returns> - public static ICacheEntryEvent<TK, TV> ReadEvent<TK, TV>(IPortableStream stream, - PortableMarshaller marsh, bool keepPortable) - { - var reader = marsh.StartUnmarshal(stream, keepPortable); - - return ReadEvent0<TK, TV>(reader); - } - - /// <summary> - /// Read multiple events. - /// </summary> - /// <param name="stream">Stream.</param> - /// <param name="marsh">Marshaller.</param> - /// <param name="keepPortable">Keep portable flag.</param> - /// <returns>Events.</returns> - [SuppressMessage("ReSharper", "PossibleNullReferenceException")] - public static ICacheEntryEvent<TK, TV>[] ReadEvents<TK, TV>(IPortableStream stream, - PortableMarshaller marsh, bool keepPortable) - { - var reader = marsh.StartUnmarshal(stream, keepPortable); - - int cnt = reader.ReadInt(); - - ICacheEntryEvent<TK, TV>[] evts = new ICacheEntryEvent<TK, TV>[cnt]; - - for (int i = 0; i < cnt; i++) - evts[i] = ReadEvent0<TK, TV>(reader); - - return evts; - } - - /// <summary> - /// Read event. - /// </summary> - /// <param name="reader">Reader.</param> - /// <returns>Event.</returns> - private static ICacheEntryEvent<TK, TV> ReadEvent0<TK, TV>(PortableReaderImpl reader) - { - reader.DetachNext(); - TK key = reader.ReadObject<TK>(); - - reader.DetachNext(); - TV oldVal = reader.ReadObject<TV>(); - - reader.DetachNext(); - TV val = reader.ReadObject<TV>(); - - return CreateEvent(key, oldVal, val); - } - - /// <summary> - /// Create event. - /// </summary> - /// <param name="key">Key.</param> - /// <param name="oldVal">Old value.</param> - /// <param name="val">Value.</param> - /// <returns>Event.</returns> - public static ICacheEntryEvent<TK, TV> CreateEvent<TK, TV>(TK key, TV oldVal, TV val) - { - if (oldVal == null) - { - Debug.Assert(val != null); - - return new CacheEntryCreateEvent<TK, TV>(key, val); - } - - if (val == null) - { - Debug.Assert(oldVal != null); - - return new CacheEntryRemoveEvent<TK, TV>(key, oldVal); - } - - return new CacheEntryUpdateEvent<TK, TV>(key, oldVal, val); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs deleted file mode 100644 index f38346c..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache.Query -{ - using System.Collections; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Unmanaged; - - /// <summary> - /// Cursor for entry-based queries. - /// </summary> - internal class FieldsQueryCursor : AbstractQueryCursor<IList> - { - /// <summary> - /// Constructor. - /// </summary> - /// <param name="target">Target.</param> - /// <param name="marsh">Marshaler.</param> - /// <param name="keepPortable">Keep poratble flag.</param> - public FieldsQueryCursor(IUnmanagedTarget target, PortableMarshaller marsh, bool keepPortable) - : base(target, marsh, keepPortable) - { - // No-op. - } - - /** <inheritdoc /> */ - protected override IList Read(PortableReaderImpl reader) - { - int cnt = reader.ReadInt(); - - var res = new ArrayList(cnt); - - for (int i = 0; i < cnt; i++) - res.Add(reader.ReadObject<object>()); - - return res; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs deleted file mode 100644 index 0b113f5..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache.Query -{ - using Apache.Ignite.Core.Cache; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Unmanaged; - - /// <summary> - /// Cursor for entry-based queries. - /// </summary> - internal class QueryCursor<TK, TV> : AbstractQueryCursor<ICacheEntry<TK, TV>> - { - /// <summary> - /// Constructor. - /// </summary> - /// <param name="target">Target.</param> - /// <param name="marsh">Marshaler.</param> - /// <param name="keepPortable">Keep poratble flag.</param> - public QueryCursor(IUnmanagedTarget target, PortableMarshaller marsh, - bool keepPortable) : base(target, marsh, keepPortable) - { - // No-op. - } - - /** <inheritdoc /> */ - protected override ICacheEntry<TK, TV> Read(PortableReaderImpl reader) - { - TK key = reader.ReadObject<TK>(); - TV val = reader.ReadObject<TV>(); - - return new CacheEntry<TK, TV>(key, val); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs deleted file mode 100644 index 3fbc705..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache.Store -{ - using System.Collections; - using System.Diagnostics; - using Apache.Ignite.Core.Cache.Store; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Impl.Handle; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Portable.IO; - using Apache.Ignite.Core.Impl.Resource; - using Apache.Ignite.Core.Impl.Unmanaged; - using Apache.Ignite.Core.Portable; - - /// <summary> - /// Interop cache store. - /// </summary> - internal class CacheStore - { - /** */ - private const byte OpLoadCache = 0; - - /** */ - private const byte OpLoad = 1; - - /** */ - private const byte OpLoadAll = 2; - - /** */ - private const byte OpPut = 3; - - /** */ - private const byte OpPutAll = 4; - - /** */ - private const byte OpRmv = 5; - - /** */ - private const byte OpRmvAll = 6; - - /** */ - private const byte OpSesEnd = 7; - - /** */ - private readonly bool _convertPortable; - - /** Store. */ - private readonly ICacheStore _store; - - /** Session. */ - private readonly CacheStoreSessionProxy _sesProxy; - - /** */ - private readonly long _handle; - - /// <summary> - /// Initializes a new instance of the <see cref="CacheStore" /> class. - /// </summary> - /// <param name="store">Store.</param> - /// <param name="convertPortable">Whether to convert portable objects.</param> - /// <param name="registry">The handle registry.</param> - private CacheStore(ICacheStore store, bool convertPortable, HandleRegistry registry) - { - Debug.Assert(store != null); - - _store = store; - _convertPortable = convertPortable; - - _sesProxy = new CacheStoreSessionProxy(); - - ResourceProcessor.InjectStoreSession(store, _sesProxy); - - _handle = registry.AllocateCritical(this); - } - - /// <summary> - /// Creates interop cache store from a stream. - /// </summary> - /// <param name="memPtr">Memory pointer.</param> - /// <param name="registry">The handle registry.</param> - /// <returns> - /// Interop cache store. - /// </returns> - internal static CacheStore CreateInstance(long memPtr, HandleRegistry registry) - { - using (var stream = IgniteManager.Memory.Get(memPtr).Stream()) - { - var reader = PortableUtils.Marshaller.StartUnmarshal(stream, PortableMode.KeepPortable); - - var assemblyName = reader.ReadString(); - var className = reader.ReadString(); - var convertPortable = reader.ReadBoolean(); - var propertyMap = reader.ReadGenericDictionary<string, object>(); - - var store = (ICacheStore) IgniteUtils.CreateInstance(assemblyName, className); - - IgniteUtils.SetProperties(store, propertyMap); - - return new CacheStore(store, convertPortable, registry); - } - } - - /// <summary> - /// Gets the handle. - /// </summary> - public long Handle - { - get { return _handle; } - } - - /// <summary> - /// Initializes this instance with a grid. - /// </summary> - /// <param name="grid">Grid.</param> - public void Init(Ignite grid) - { - ResourceProcessor.Inject(_store, grid); - } - - /// <summary> - /// Invokes a store operation. - /// </summary> - /// <param name="input">Input stream.</param> - /// <param name="cb">Callback.</param> - /// <param name="grid">Grid.</param> - /// <returns>Invocation result.</returns> - /// <exception cref="IgniteException">Invalid operation type: + opType</exception> - public int Invoke(IPortableStream input, IUnmanagedTarget cb, Ignite grid) - { - IPortableReader reader = grid.Marshaller.StartUnmarshal(input, - _convertPortable ? PortableMode.Deserialize : PortableMode.ForcePortable); - - IPortableRawReader rawReader = reader.RawReader(); - - int opType = rawReader.ReadByte(); - - // Setup cache sessoin for this invocation. - long sesId = rawReader.ReadLong(); - - CacheStoreSession ses = grid.HandleRegistry.Get<CacheStoreSession>(sesId, true); - - ses.CacheName = rawReader.ReadString(); - - _sesProxy.SetSession(ses); - - try - { - // Perform operation. - switch (opType) - { - case OpLoadCache: - _store.LoadCache((k, v) => WriteObjects(cb, grid, k, v), rawReader.ReadObjectArray<object>()); - - break; - - case OpLoad: - object val = _store.Load(rawReader.ReadObject<object>()); - - if (val != null) - WriteObjects(cb, grid, val); - - break; - - case OpLoadAll: - var keys = rawReader.ReadCollection(); - - var result = _store.LoadAll(keys); - - foreach (DictionaryEntry entry in result) - WriteObjects(cb, grid, entry.Key, entry.Value); - - break; - - case OpPut: - _store.Write(rawReader.ReadObject<object>(), rawReader.ReadObject<object>()); - - break; - - case OpPutAll: - _store.WriteAll(rawReader.ReadDictionary()); - - break; - - case OpRmv: - _store.Delete(rawReader.ReadObject<object>()); - - break; - - case OpRmvAll: - _store.DeleteAll(rawReader.ReadCollection()); - - break; - - case OpSesEnd: - grid.HandleRegistry.Release(sesId); - - _store.SessionEnd(rawReader.ReadBoolean()); - - break; - - default: - throw new IgniteException("Invalid operation type: " + opType); - } - - return 0; - } - finally - { - _sesProxy.ClearSession(); - } - } - - /// <summary> - /// Writes objects to the marshaller. - /// </summary> - /// <param name="cb">Optional callback.</param> - /// <param name="grid">Grid.</param> - /// <param name="objects">Objects.</param> - private static void WriteObjects(IUnmanagedTarget cb, Ignite grid, params object[] objects) - { - using (var stream = IgniteManager.Memory.Allocate().Stream()) - { - PortableWriterImpl writer = grid.Marshaller.StartMarshal(stream); - - try - { - foreach (var obj in objects) - { - writer.DetachNext(); - writer.WriteObject(obj); - } - } - finally - { - grid.Marshaller.FinishMarshal(writer); - } - - if (cb != null) - { - stream.SynchronizeOutput(); - - UnmanagedUtils.CacheStoreCallbackInvoke(cb, stream.MemoryPointer); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs deleted file mode 100644 index f771fe8..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache.Store -{ - using System.Collections.Generic; - using Apache.Ignite.Core.Cache.Store; - - /// <summary> - /// Store session implementation. - /// </summary> - internal class CacheStoreSession : ICacheStoreSession - { - /** Properties. */ - private IDictionary<object, object> _props; - - /** <inheritdoc /> */ - - public string CacheName - { - get; internal set; - } - - /** <inheritdoc /> */ - public IDictionary<object, object> Properties - { - get { return _props ?? (_props = new Dictionary<object, object>(2)); } - } - - /// <summary> - /// Clear session state. - /// </summary> - public void Clear() - { - if (_props != null) - _props.Clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs deleted file mode 100644 index 3dd7354..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache.Store -{ - using System.Collections.Generic; - using System.Diagnostics.CodeAnalysis; - using System.Threading; - using Apache.Ignite.Core.Cache.Store; - - /// <summary> - /// Store session proxy. - /// </summary> - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] - internal class CacheStoreSessionProxy : ICacheStoreSession - { - /** Session. */ - private readonly ThreadLocal<CacheStoreSession> _target = new ThreadLocal<CacheStoreSession>(); - - /** <inheritdoc /> */ - public string CacheName - { - get { return _target.Value.CacheName; } - } - - /** <inheritdoc /> */ - public IDictionary<object, object> Properties - { - get { return _target.Value.Properties; } - } - - /// <summary> - /// Set thread-bound session. - /// </summary> - /// <param name="ses">Session.</param> - internal void SetSession(CacheStoreSession ses) - { - _target.Value = ses; - } - - /// <summary> - /// Clear thread-bound session. - /// </summary> - internal void ClearSession() - { - _target.Value = null; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs deleted file mode 100644 index 382ab1e..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs +++ /dev/null @@ -1,577 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cluster -{ - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.Diagnostics.CodeAnalysis; - using System.Linq; - using System.Threading; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Compute; - using Apache.Ignite.Core.Events; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Compute; - using Apache.Ignite.Core.Impl.Events; - using Apache.Ignite.Core.Impl.Messaging; - using Apache.Ignite.Core.Impl.Portable; - using Apache.Ignite.Core.Impl.Portable.Metadata; - using Apache.Ignite.Core.Impl.Services; - using Apache.Ignite.Core.Impl.Unmanaged; - using Apache.Ignite.Core.Messaging; - using Apache.Ignite.Core.Portable; - using Apache.Ignite.Core.Services; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; - - /// <summary> - /// Ignite projection implementation. - /// </summary> - internal class ClusterGroupImpl : PlatformTarget, IClusterGroupEx - { - /** Attribute: platform. */ - private const string AttrPlatform = "org.apache.ignite.platform"; - - /** Platform. */ - private const string Platform = "dotnet"; - - /** Initial topver; invalid from Java perspective, so update will be triggered when this value is met. */ - private const int TopVerInit = 0; - - /** */ - private const int OpAllMetadata = 1; - - /** */ - private const int OpForAttribute = 2; - - /** */ - private const int OpForCache = 3; - - /** */ - private const int OpForClient = 4; - - /** */ - private const int OpForData = 5; - - /** */ - private const int OpForHost = 6; - - /** */ - private const int OpForNodeIds = 7; - - /** */ - private const int OpMetadata = 8; - - /** */ - private const int OpMetrics = 9; - - /** */ - private const int OpMetricsFiltered = 10; - - /** */ - private const int OpNodeMetrics = 11; - - /** */ - private const int OpNodes = 12; - - /** */ - private const int OpPingNode = 13; - - /** */ - private const int OpTopology = 14; - - /** Initial Ignite instance. */ - private readonly Ignite _ignite; - - /** Predicate. */ - private readonly Func<IClusterNode, bool> _pred; - - /** Topology version. */ - [SuppressMessage("Microsoft.Performance", "CA1805:DoNotInitializeUnnecessarily")] - private long _topVer = TopVerInit; - - /** Nodes for the given topology version. */ - private volatile IList<IClusterNode> _nodes; - - /** Processor. */ - private readonly IUnmanagedTarget _proc; - - /** Compute. */ - private readonly Lazy<Compute> _comp; - - /** Messaging. */ - private readonly Lazy<Messaging> _msg; - - /** Events. */ - private readonly Lazy<Events> _events; - - /** Services. */ - private readonly Lazy<IServices> _services; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="proc">Processor.</param> - /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> - /// <param name="ignite">Grid.</param> - /// <param name="pred">Predicate.</param> - public ClusterGroupImpl(IUnmanagedTarget proc, IUnmanagedTarget target, PortableMarshaller marsh, - Ignite ignite, Func<IClusterNode, bool> pred) - : base(target, marsh) - { - _proc = proc; - _ignite = ignite; - _pred = pred; - - _comp = new Lazy<Compute>(() => - new Compute(new ComputeImpl(UU.ProcessorCompute(proc, target), marsh, this, false))); - - _msg = new Lazy<Messaging>(() => new Messaging(UU.ProcessorMessage(proc, target), marsh, this)); - - _events = new Lazy<Events>(() => new Events(UU.ProcessorEvents(proc, target), marsh, this)); - - _services = new Lazy<IServices>(() => - new Services(UU.ProcessorServices(proc, target), marsh, this, false, false)); - } - - /** <inheritDoc /> */ - public IIgnite Ignite - { - get { return _ignite; } - } - - /** <inheritDoc /> */ - public ICompute GetCompute() - { - return _comp.Value; - } - - /** <inheritDoc /> */ - public IClusterGroup ForNodes(IEnumerable<IClusterNode> nodes) - { - IgniteArgumentCheck.NotNull(nodes, "nodes"); - - return ForNodeIds0(nodes, node => node.Id); - } - - /** <inheritDoc /> */ - public IClusterGroup ForNodes(params IClusterNode[] nodes) - { - IgniteArgumentCheck.NotNull(nodes, "nodes"); - - return ForNodeIds0(nodes, node => node.Id); - } - - /** <inheritDoc /> */ - public IClusterGroup ForNodeIds(IEnumerable<Guid> ids) - { - IgniteArgumentCheck.NotNull(ids, "ids"); - - return ForNodeIds0(ids, null); - } - - /** <inheritDoc /> */ - public IClusterGroup ForNodeIds(params Guid[] ids) - { - IgniteArgumentCheck.NotNull(ids, "ids"); - - return ForNodeIds0(ids, null); - } - - /// <summary> - /// Internal routine to get projection for specific node IDs. - /// </summary> - /// <param name="items">Items.</param> - /// <param name="func">Function to transform item to Guid (optional).</param> - /// <returns></returns> - private IClusterGroup ForNodeIds0<T>(IEnumerable<T> items, Func<T, Guid> func) - { - Debug.Assert(items != null); - - IUnmanagedTarget prj = DoProjetionOutOp(OpForNodeIds, writer => - { - WriteEnumerable(writer, items, func); - }); - - return GetClusterGroup(prj); - } - - /** <inheritDoc /> */ - public IClusterGroup ForPredicate(Func<IClusterNode, bool> p) - { - var newPred = _pred == null ? p : node => _pred(node) && p(node); - - return new ClusterGroupImpl(_proc, Target, Marshaller, _ignite, newPred); - } - - /** <inheritDoc /> */ - public IClusterGroup ForAttribute(string name, string val) - { - IgniteArgumentCheck.NotNull(name, "name"); - - IUnmanagedTarget prj = DoProjetionOutOp(OpForAttribute, writer => - { - writer.WriteString(name); - writer.WriteString(val); - }); - - return GetClusterGroup(prj); - } - - /// <summary> - /// Creates projection with a specified op. - /// </summary> - /// <param name="name">Cache name to include into projection.</param> - /// <param name="op">Operation id.</param> - /// <returns> - /// Projection over nodes that have specified cache running. - /// </returns> - private IClusterGroup ForCacheNodes(string name, int op) - { - IUnmanagedTarget prj = DoProjetionOutOp(op, writer => - { - writer.WriteString(name); - }); - - return GetClusterGroup(prj); - } - - /** <inheritDoc /> */ - public IClusterGroup ForCacheNodes(string name) - { - return ForCacheNodes(name, OpForCache); - } - - /** <inheritDoc /> */ - public IClusterGroup ForDataNodes(string name) - { - return ForCacheNodes(name, OpForData); - } - - /** <inheritDoc /> */ - public IClusterGroup ForClientNodes(string name) - { - return ForCacheNodes(name, OpForClient); - } - - /** <inheritDoc /> */ - public IClusterGroup ForRemotes() - { - return GetClusterGroup(UU.ProjectionForRemotes(Target)); - } - - /** <inheritDoc /> */ - public IClusterGroup ForHost(IClusterNode node) - { - IgniteArgumentCheck.NotNull(node, "node"); - - IUnmanagedTarget prj = DoProjetionOutOp(OpForHost, writer => - { - writer.WriteGuid(node.Id); - }); - - return GetClusterGroup(prj); - } - - /** <inheritDoc /> */ - public IClusterGroup ForRandom() - { - return GetClusterGroup(UU.ProjectionForRandom(Target)); - } - - /** <inheritDoc /> */ - public IClusterGroup ForOldest() - { - return GetClusterGroup(UU.ProjectionForOldest(Target)); - } - - /** <inheritDoc /> */ - public IClusterGroup ForYoungest() - { - return GetClusterGroup(UU.ProjectionForYoungest(Target)); - } - - /** <inheritDoc /> */ - public IClusterGroup ForDotNet() - { - return ForAttribute(AttrPlatform, Platform); - } - - /** <inheritDoc /> */ - public ICollection<IClusterNode> GetNodes() - { - return RefreshNodes(); - } - - /** <inheritDoc /> */ - public IClusterNode GetNode(Guid id) - { - return GetNodes().FirstOrDefault(node => node.Id == id); - } - - /** <inheritDoc /> */ - public IClusterNode GetNode() - { - return GetNodes().FirstOrDefault(); - } - - /** <inheritDoc /> */ - public IClusterMetrics GetMetrics() - { - if (_pred == null) - { - return DoInOp(OpMetrics, stream => - { - IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false); - - return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null; - }); - } - return DoOutInOp(OpMetricsFiltered, writer => - { - WriteEnumerable(writer, GetNodes().Select(node => node.Id)); - }, stream => - { - IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false); - - return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null; - }); - } - - /** <inheritDoc /> */ - public IMessaging GetMessaging() - { - return _msg.Value; - } - - /** <inheritDoc /> */ - public IEvents GetEvents() - { - return _events.Value; - } - - /** <inheritDoc /> */ - public IServices GetServices() - { - return _services.Value; - } - - /// <summary> - /// Pings a remote node. - /// </summary> - /// <param name="nodeId">ID of a node to ping.</param> - /// <returns>True if node for a given ID is alive, false otherwise.</returns> - internal bool PingNode(Guid nodeId) - { - return DoOutOp(OpPingNode, nodeId) == True; - } - - /// <summary> - /// Predicate (if any). - /// </summary> - public Func<IClusterNode, bool> Predicate - { - get { return _pred; } - } - - /// <summary> - /// Refresh cluster node metrics. - /// </summary> - /// <param name="nodeId">Node</param> - /// <param name="lastUpdateTime"></param> - /// <returns></returns> - internal ClusterMetricsImpl RefreshClusterNodeMetrics(Guid nodeId, long lastUpdateTime) - { - return DoOutInOp(OpNodeMetrics, writer => - { - writer.WriteGuid(nodeId); - writer.WriteLong(lastUpdateTime); - }, stream => - { - IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false); - - return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null; - } - ); - } - - /// <summary> - /// Gets a topology by version. Returns null if topology history storage doesn't contain - /// specified topology version (history currently keeps the last 1000 snapshots). - /// </summary> - /// <param name="version">Topology version.</param> - /// <returns>Collection of Ignite nodes which represented by specified topology version, - /// if it is present in history storage, {@code null} otherwise.</returns> - /// <exception cref="IgniteException">If underlying SPI implementation does not support - /// topology history. Currently only {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} - /// supports topology history.</exception> - internal ICollection<IClusterNode> Topology(long version) - { - return DoOutInOp(OpTopology, writer => writer.WriteLong(version), - input => IgniteUtils.ReadNodes(Marshaller.StartUnmarshal(input))); - } - - /// <summary> - /// Topology version. - /// </summary> - internal long TopologyVersion - { - get - { - RefreshNodes(); - - return Interlocked.Read(ref _topVer); - } - } - - /// <summary> - /// Update topology. - /// </summary> - /// <param name="newTopVer">New topology version.</param> - /// <param name="newNodes">New nodes.</param> - internal void UpdateTopology(long newTopVer, List<IClusterNode> newNodes) - { - lock (this) - { - // If another thread already advanced topology version further, we still - // can safely return currently received nodes, but we will not assign them. - if (_topVer < newTopVer) - { - Interlocked.Exchange(ref _topVer, newTopVer); - - _nodes = newNodes.AsReadOnly(); - } - } - } - - /// <summary> - /// Get current nodes without refreshing the topology. - /// </summary> - /// <returns>Current nodes.</returns> - internal IList<IClusterNode> NodesNoRefresh() - { - return _nodes; - } - - /// <summary> - /// Creates new Cluster Group from given native projection. - /// </summary> - /// <param name="prj">Native projection.</param> - /// <returns>New cluster group.</returns> - private IClusterGroup GetClusterGroup(IUnmanagedTarget prj) - { - return new ClusterGroupImpl(_proc, prj, Marshaller, _ignite, _pred); - } - - /// <summary> - /// Refresh projection nodes. - /// </summary> - /// <returns>Nodes.</returns> - private IList<IClusterNode> RefreshNodes() - { - long oldTopVer = Interlocked.Read(ref _topVer); - - List<IClusterNode> newNodes = null; - - DoOutInOp(OpNodes, writer => - { - writer.WriteLong(oldTopVer); - }, input => - { - PortableReaderImpl reader = Marshaller.StartUnmarshal(input); - - if (reader.ReadBoolean()) - { - // Topology has been updated. - long newTopVer = reader.ReadLong(); - - newNodes = IgniteUtils.ReadNodes(reader, _pred); - - UpdateTopology(newTopVer, newNodes); - } - }); - - if (newNodes != null) - return newNodes; - - // No topology changes. - Debug.Assert(_nodes != null, "At least one topology update should have occurred."); - - return _nodes; - } - - /// <summary> - /// Perform synchronous out operation returning value. - /// </summary> - /// <param name="type">Operation type.</param> - /// <param name="action">Action.</param> - /// <returns>Native projection.</returns> - private IUnmanagedTarget DoProjetionOutOp(int type, Action<PortableWriterImpl> action) - { - using (var stream = IgniteManager.Memory.Allocate().Stream()) - { - var writer = Marshaller.StartMarshal(stream); - - action(writer); - - FinishMarshal(writer); - - return UU.ProjectionOutOpRet(Target, type, stream.SynchronizeOutput()); - } - } - - /** <inheritDoc /> */ - public IPortableMetadata Metadata(int typeId) - { - return DoOutInOp<IPortableMetadata>(OpMetadata, - writer => - { - writer.WriteInt(typeId); - }, - stream => - { - PortableReaderImpl reader = Marshaller.StartUnmarshal(stream, false); - - return reader.ReadBoolean() ? new PortableMetadataImpl(reader) : null; - } - ); - } - - /// <summary> - /// Gets metadata for all known types. - /// </summary> - public List<IPortableMetadata> Metadata() - { - return DoInOp(OpAllMetadata, s => - { - var reader = Marshaller.StartUnmarshal(s); - - var size = reader.ReadInt(); - - var res = new List<IPortableMetadata>(size); - - for (var i = 0; i < size; i++) - res.Add(reader.ReadBoolean() ? new PortableMetadataImpl(reader) : null); - - return res; - }); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs deleted file mode 100644 index 664a1f1..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cluster -{ - using System; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Portable; - - /// <summary> - /// Cluster metrics implementation. - /// </summary> - internal class ClusterMetricsImpl : IClusterMetrics - { - /// <summary> - /// Initializes a new instance of the <see cref="ClusterMetricsImpl"/> class. - /// </summary> - /// <param name="reader">The reader.</param> - public ClusterMetricsImpl(IPortableRawReader reader) - { - LastUpdateTimeRaw = reader.ReadLong(); - - DateTime? lastUpdateTime0 = reader.ReadDate(); - - LastUpdateTime = lastUpdateTime0 ?? default(DateTime); - MaximumActiveJobs = reader.ReadInt(); - CurrentActiveJobs = reader.ReadInt(); - AverageActiveJobs = reader.ReadFloat(); - MaximumWaitingJobs = reader.ReadInt(); - - CurrentWaitingJobs = reader.ReadInt(); - AverageWaitingJobs = reader.ReadFloat(); - MaximumRejectedJobs = reader.ReadInt(); - CurrentRejectedJobs = reader.ReadInt(); - AverageRejectedJobs = reader.ReadFloat(); - - TotalRejectedJobs = reader.ReadInt(); - MaximumCancelledJobs = reader.ReadInt(); - CurrentCancelledJobs = reader.ReadInt(); - AverageCancelledJobs = reader.ReadFloat(); - TotalCancelledJobs = reader.ReadInt(); - - TotalExecutedJobs = reader.ReadInt(); - MaximumJobWaitTime = reader.ReadLong(); - CurrentJobWaitTime = reader.ReadLong(); - AverageJobWaitTime = reader.ReadDouble(); - MaximumJobExecuteTime = reader.ReadLong(); - - CurrentJobExecuteTime = reader.ReadLong(); - AverageJobExecuteTime = reader.ReadDouble(); - TotalExecutedTasks = reader.ReadInt(); - TotalIdleTime = reader.ReadLong(); - CurrentIdleTime = reader.ReadLong(); - - TotalCpus = reader.ReadInt(); - CurrentCpuLoad = reader.ReadDouble(); - AverageCpuLoad = reader.ReadDouble(); - CurrentGcCpuLoad = reader.ReadDouble(); - HeapMemoryInitialized = reader.ReadLong(); - - HeapMemoryUsed = reader.ReadLong(); - HeapMemoryCommitted = reader.ReadLong(); - HeapMemoryMaximum = reader.ReadLong(); - HeapMemoryTotal = reader.ReadLong(); - NonHeapMemoryInitialized = reader.ReadLong(); - - NonHeapMemoryUsed = reader.ReadLong(); - NonHeapMemoryCommitted = reader.ReadLong(); - NonHeapMemoryMaximum = reader.ReadLong(); - NonHeapMemoryTotal = reader.ReadLong(); - UpTime = reader.ReadLong(); - - DateTime? startTime0 = reader.ReadDate(); - - StartTime = startTime0 ?? default(DateTime); - - DateTime? nodeStartTime0 = reader.ReadDate(); - - NodeStartTime = nodeStartTime0 ?? default(DateTime); - - CurrentThreadCount = reader.ReadInt(); - MaximumThreadCount = reader.ReadInt(); - TotalStartedThreadCount = reader.ReadLong(); - CurrentDaemonThreadCount = reader.ReadInt(); - LastDataVersion = reader.ReadLong(); - - SentMessagesCount = reader.ReadInt(); - SentBytesCount = reader.ReadLong(); - ReceivedMessagesCount = reader.ReadInt(); - ReceivedBytesCount = reader.ReadLong(); - OutboundMessagesQueueSize = reader.ReadInt(); - - TotalNodes = reader.ReadInt(); - } - - /// <summary> - /// Last update time in raw format. - /// </summary> - internal long LastUpdateTimeRaw { get; set; } - - /** <inheritDoc /> */ - public DateTime LastUpdateTime { get; private set; } - - /** <inheritDoc /> */ - public int MaximumActiveJobs { get; private set; } - - /** <inheritDoc /> */ - public int CurrentActiveJobs { get; private set; } - - /** <inheritDoc /> */ - public float AverageActiveJobs { get; private set; } - - /** <inheritDoc /> */ - public int MaximumWaitingJobs { get; private set; } - - /** <inheritDoc /> */ - public int CurrentWaitingJobs { get; private set; } - - /** <inheritDoc /> */ - public float AverageWaitingJobs { get; private set; } - - /** <inheritDoc /> */ - public int MaximumRejectedJobs { get; private set; } - - /** <inheritDoc /> */ - public int CurrentRejectedJobs { get; private set; } - - /** <inheritDoc /> */ - public float AverageRejectedJobs { get; private set; } - - /** <inheritDoc /> */ - public int TotalRejectedJobs { get; private set; } - - /** <inheritDoc /> */ - public int MaximumCancelledJobs { get; private set; } - - /** <inheritDoc /> */ - public int CurrentCancelledJobs { get; private set; } - - /** <inheritDoc /> */ - public float AverageCancelledJobs { get; private set; } - - /** <inheritDoc /> */ - public int TotalCancelledJobs { get; private set; } - - /** <inheritDoc /> */ - public int TotalExecutedJobs { get; private set; } - - /** <inheritDoc /> */ - public long MaximumJobWaitTime { get; private set; } - - /** <inheritDoc /> */ - public long CurrentJobWaitTime { get; private set; } - - /** <inheritDoc /> */ - public double AverageJobWaitTime { get; private set; } - - /** <inheritDoc /> */ - public long MaximumJobExecuteTime { get; private set; } - - /** <inheritDoc /> */ - public long CurrentJobExecuteTime { get; private set; } - - /** <inheritDoc /> */ - public double AverageJobExecuteTime { get; private set; } - - /** <inheritDoc /> */ - public int TotalExecutedTasks { get; private set; } - - /** <inheritDoc /> */ - public long TotalBusyTime - { - get { return UpTime - TotalIdleTime; } - } - - /** <inheritDoc /> */ - public long TotalIdleTime { get; private set; } - - /** <inheritDoc /> */ - public long CurrentIdleTime { get; private set; } - - /** <inheritDoc /> */ - public float BusyTimePercentage - { - get { return 1 - IdleTimePercentage; } - } - - /** <inheritDoc /> */ - public float IdleTimePercentage - { - get { return TotalIdleTime / (float) UpTime; } - } - - /** <inheritDoc /> */ - public int TotalCpus { get; private set; } - - /** <inheritDoc /> */ - public double CurrentCpuLoad { get; private set; } - - /** <inheritDoc /> */ - public double AverageCpuLoad { get; private set; } - - /** <inheritDoc /> */ - public double CurrentGcCpuLoad { get; private set; } - - /** <inheritDoc /> */ - public long HeapMemoryInitialized { get; private set; } - - /** <inheritDoc /> */ - public long HeapMemoryUsed { get; private set; } - - /** <inheritDoc /> */ - public long HeapMemoryCommitted { get; private set; } - - /** <inheritDoc /> */ - public long HeapMemoryMaximum { get; private set; } - - /** <inheritDoc /> */ - public long HeapMemoryTotal { get; private set; } - - /** <inheritDoc /> */ - public long NonHeapMemoryInitialized { get; private set; } - - /** <inheritDoc /> */ - public long NonHeapMemoryUsed { get; private set; } - - /** <inheritDoc /> */ - public long NonHeapMemoryCommitted { get; private set; } - - /** <inheritDoc /> */ - public long NonHeapMemoryMaximum { get; private set; } - - /** <inheritDoc /> */ - public long NonHeapMemoryTotal { get; private set; } - - /** <inheritDoc /> */ - public long UpTime { get; private set; } - - /** <inheritDoc /> */ - public DateTime StartTime { get; private set; } - - /** <inheritDoc /> */ - public DateTime NodeStartTime { get; private set; } - - /** <inheritDoc /> */ - public int CurrentThreadCount { get; private set; } - - /** <inheritDoc /> */ - public int MaximumThreadCount { get; private set; } - - /** <inheritDoc /> */ - public long TotalStartedThreadCount { get; private set; } - - /** <inheritDoc /> */ - public int CurrentDaemonThreadCount { get; private set; } - - /** <inheritDoc /> */ - public long LastDataVersion { get; private set; } - - /** <inheritDoc /> */ - public int SentMessagesCount { get; private set; } - - /** <inheritDoc /> */ - public long SentBytesCount { get; private set; } - - /** <inheritDoc /> */ - public int ReceivedMessagesCount { get; private set; } - - /** <inheritDoc /> */ - public long ReceivedBytesCount { get; private set; } - - /** <inheritDoc /> */ - public int OutboundMessagesQueueSize { get; private set; } - - /** <inheritDoc /> */ - public int TotalNodes { get; private set; } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs deleted file mode 100644 index da49feb..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cluster -{ - using System; - using System.Collections.Generic; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Impl.Collections; - using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Portable; - - /// <summary> - /// Cluster node implementation. - /// </summary> - internal class ClusterNodeImpl : IClusterNode - { - /** Node ID. */ - private readonly Guid _id; - - /** Attributes. */ - private readonly IDictionary<string, object> _attrs; - - /** Addresses. */ - private readonly ICollection<string> _addrs; - - /** Hosts. */ - private readonly ICollection<string> _hosts; - - /** Order. */ - private readonly long _order; - - /** Local flag. */ - private readonly bool _local; - - /** Daemon flag. */ - private readonly bool _daemon; - - /** Metrics. */ - private volatile ClusterMetricsImpl _metrics; - - /** Ignite reference. */ - private WeakReference _igniteRef; - - /// <summary> - /// Initializes a new instance of the <see cref="ClusterNodeImpl"/> class. - /// </summary> - /// <param name="reader">The reader.</param> - public ClusterNodeImpl(IPortableRawReader reader) - { - _id = reader.ReadGuid() ?? default(Guid); - - _attrs = reader.ReadGenericDictionary<string, object>().AsReadOnly(); - _addrs = reader.ReadGenericCollection<string>().AsReadOnly(); - _hosts = reader.ReadGenericCollection<string>().AsReadOnly(); - _order = reader.ReadLong(); - _local = reader.ReadBoolean(); - _daemon = reader.ReadBoolean(); - - _metrics = reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null; - } - - /** <inheritDoc /> */ - public Guid Id - { - get { return _id; } - } - - /** <inheritDoc /> */ - public T GetAttribute<T>(string name) - { - IgniteArgumentCheck.NotNull(name, "name"); - - return (T)_attrs[name]; - } - - /** <inheritDoc /> */ - public bool TryGetAttribute<T>(string name, out T attr) - { - IgniteArgumentCheck.NotNull(name, "name"); - - object val; - - if (_attrs.TryGetValue(name, out val)) - { - attr = (T)val; - - return true; - } - attr = default(T); - - return false; - } - - /** <inheritDoc /> */ - public IDictionary<string, object> GetAttributes() - { - return _attrs; - } - - /** <inheritDoc /> */ - public ICollection<string> Addresses - { - get - { - return _addrs; - } - } - - /** <inheritDoc /> */ - public ICollection<string> HostNames - { - get - { - return _hosts; - } - } - - /** <inheritDoc /> */ - public long Order - { - get - { - return _order; - } - } - - /** <inheritDoc /> */ - public bool IsLocal - { - get - { - return _local; - } - } - - /** <inheritDoc /> */ - public bool IsDaemon - { - get - { - return _daemon; - } - } - - /** <inheritDoc /> */ - public IClusterMetrics GetMetrics() - { - var ignite = (Ignite)_igniteRef.Target; - - if (ignite == null) - return _metrics; - - ClusterMetricsImpl oldMetrics = _metrics; - - long lastUpdateTime = oldMetrics.LastUpdateTimeRaw; - - ClusterMetricsImpl newMetrics = ignite.ClusterGroup.RefreshClusterNodeMetrics(_id, lastUpdateTime); - - if (newMetrics != null) - { - lock (this) - { - if (_metrics.LastUpdateTime < newMetrics.LastUpdateTime) - _metrics = newMetrics; - } - - return newMetrics; - } - - return oldMetrics; - } - - /** <inheritDoc /> */ - public override string ToString() - { - return "GridNode [id=" + Id + ']'; - } - - /** <inheritDoc /> */ - public override bool Equals(object obj) - { - ClusterNodeImpl node = obj as ClusterNodeImpl; - - if (node != null) - return _id.Equals(node._id); - - return false; - } - - /** <inheritDoc /> */ - public override int GetHashCode() - { - // ReSharper disable once NonReadonlyMemberInGetHashCode - return _id.GetHashCode(); - } - - /// <summary> - /// Initializes this instance with a grid. - /// </summary> - /// <param name="grid">The grid.</param> - internal void Init(Ignite grid) - { - _igniteRef = new WeakReference(grid); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs deleted file mode 100644 index 554eb0a..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cluster -{ - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Portable; - - /// <summary> - /// - /// </summary> - internal interface IClusterGroupEx : IClusterGroup - { - /// <summary> - /// Gets protable metadata for type. - /// </summary> - /// <param name="typeId">Type ID.</param> - /// <returns>Metadata.</returns> - IPortableMetadata Metadata(int typeId); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Collections/CollectionExtensions.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Collections/CollectionExtensions.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Collections/CollectionExtensions.cs deleted file mode 100644 index 57295cb..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Collections/CollectionExtensions.cs +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Collections -{ - using System.Collections.Generic; - - /// <summary> - /// Collection extension methods. - /// </summary> - public static class CollectionExtensions - { - /// <summary> - /// Returns a read-only System.Collections.Generic.IDictionary{K, V} wrapper for the current collection. - /// </summary> - public static IDictionary<TKey, TValue> AsReadOnly<TKey, TValue>(this IDictionary<TKey, TValue> dict) - { - return new ReadOnlyDictionary<TKey, TValue>(dict); - } - - /// <summary> - /// Returns a read-only System.Collections.Generic.ICollection{K, V} wrapper for the current collection. - /// </summary> - public static ICollection<T> AsReadOnly<T>(this ICollection<T> col) - { - var list = col as List<T>; - - return list != null ? (ICollection<T>) list.AsReadOnly() : new ReadOnlyCollection<T>(col); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f0bac562/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Collections/MultiValueDictionary.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Collections/MultiValueDictionary.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Collections/MultiValueDictionary.cs deleted file mode 100644 index bd7e895..0000000 --- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Collections/MultiValueDictionary.cs +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Collections -{ - using System.Collections.Generic; - - /// <summary> - /// Multiple-values-per-key dictionary. - /// </summary> - public class MultiValueDictionary<TKey, TValue> - { - /** Inner dictionary */ - private readonly Dictionary<TKey, object> _dict = new Dictionary<TKey, object>(); - - /// <summary> - /// Adds a value. - /// </summary> - /// <param name="key">The key.</param> - /// <param name="val">The value.</param> - public void Add(TKey key, TValue val) - { - object val0; - - if (_dict.TryGetValue(key, out val0)) - { - var list = val0 as List<TValue>; - - if (list != null) - list.Add(val); - else - _dict[key] = new List<TValue> {(TValue) val0, val}; - } - else - _dict[key] = val; - } - - /// <summary> - /// Tries the get a value. In case of multiple values for a key, returns the last one. - /// </summary> - /// <param name="key">The key.</param> - /// <param name="val">The value.</param> - /// <returns>True if value has been found for specified key; otherwise false.</returns> - public bool TryGetValue(TKey key, out TValue val) - { - object val0; - - if (!_dict.TryGetValue(key, out val0)) - { - val = default(TValue); - return false; - } - - var list = val0 as List<TValue>; - - if (list != null) - val = list[list.Count - 1]; - else - val = (TValue) val0; - - return true; - } - - /// <summary> - /// Removes the specified value for the specified key. - /// </summary> - /// <param name="key">The key.</param> - /// <param name="val">The value.</param> - public void Remove(TKey key, TValue val) - { - object val0; - - if (!_dict.TryGetValue(key, out val0)) - return; - - var list = val0 as List<TValue>; - - if (list != null) - { - list.Remove(val); - - if (list.Count == 0) - _dict.Remove(key); - } - else if (Equals(val0, val)) - _dict.Remove(key); - } - - /// <summary> - /// Removes the last value for the specified key and returns it. - /// </summary> - /// <param name="key">The key.</param> - /// <param name="val">The value.</param> - /// <returns>True if value has been found for specified key; otherwise false.</returns> - public bool TryRemove(TKey key, out TValue val) - { - object val0; - - if (!_dict.TryGetValue(key, out val0)) - { - val = default(TValue); - - return false; - } - - var list = val0 as List<TValue>; - - if (list != null) - { - var index = list.Count - 1; - - val = list[index]; - - list.RemoveAt(index); - - if (list.Count == 0) - _dict.Remove(key); - - return true; - } - - val = (TValue) val0; - - _dict.Remove(key); - - return true; - } - } -} \ No newline at end of file
