http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs new file mode 100644 index 0000000..0f4b5a3 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Query +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using Apache.Ignite.Core.Cache.Query; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Unmanaged; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Abstract query cursor implementation. + /// </summary> + internal abstract class AbstractQueryCursor<T> : PlatformDisposableTarget, IQueryCursor<T>, IEnumerator<T> + { + /** */ + private const int OpGetAll = 1; + + /** */ + private const int OpGetBatch = 2; + + /** Position before head. */ + private const int BatchPosBeforeHead = -1; + + /** Keep portable flag. */ + private readonly bool _keepPortable; + + /** Wherther "GetAll" was called. */ + private bool _getAllCalled; + + /** Whether "GetEnumerator" was called. */ + private bool _iterCalled; + + /** Batch with entries. */ + private T[] _batch; + + /** Current position in batch. */ + private int _batchPos = BatchPosBeforeHead; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="keepPortable">Keep portable flag.</param> + protected AbstractQueryCursor(IUnmanagedTarget target, PortableMarshaller marsh, bool keepPortable) : + base(target, marsh) + { + _keepPortable = keepPortable; + } + + #region Public methods + + /** <inheritdoc /> */ + public IList<T> GetAll() + { + ThrowIfDisposed(); + + if (_iterCalled) + throw new InvalidOperationException("Failed to get all entries because GetEnumerator() " + + "method has already been called."); + + if (_getAllCalled) + throw new InvalidOperationException("Failed to get all entries because GetAll() " + + "method has already been called."); + + var res = DoInOp<IList<T>>(OpGetAll, ConvertGetAll); + + _getAllCalled = true; + + return res; + } + + /** <inheritdoc /> */ + protected override void Dispose(bool disposing) + { + try + { + UU.QueryCursorClose(Target); + } + finally + { + base.Dispose(disposing); + } + } + + #endregion + + #region Public IEnumerable methods + + /** <inheritdoc /> */ + [SuppressMessage("ReSharper", "PossibleNullReferenceException")] + public IEnumerator<T> GetEnumerator() + { + ThrowIfDisposed(); + + if (_iterCalled) + throw new InvalidOperationException("Failed to get enumerator entries because " + + "GetEnumeartor() method has already been called."); + + if (_getAllCalled) + throw new InvalidOperationException("Failed to get enumerator entries because " + + "GetAll() method has already been called."); + + UU.QueryCursorIterator(Target); + + _iterCalled = true; + + return this; + } + + /** <inheritdoc /> */ + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + #endregion + + #region Public IEnumerator methods + + /** <inheritdoc /> */ + public T Current + { + get + { + ThrowIfDisposed(); + + if (_batchPos == BatchPosBeforeHead) + throw new InvalidOperationException("MoveNext has not been called."); + + if (_batch == null) + throw new InvalidOperationException("Previous call to MoveNext returned false."); + + return _batch[_batchPos]; + } + } + + /** <inheritdoc /> */ + object IEnumerator.Current + { + get { return Current; } + } + + /** <inheritdoc /> */ + public bool MoveNext() + { + ThrowIfDisposed(); + + if (_batch == null) + { + if (_batchPos == BatchPosBeforeHead) + // Standing before head, let's get batch and advance position. + RequestBatch(); + } + else + { + _batchPos++; + + if (_batch.Length == _batchPos) + // Reached batch end => request another. + RequestBatch(); + } + + return _batch != null; + } + + /** <inheritdoc /> */ + public void Reset() + { + throw new NotSupportedException("Reset is not supported."); + } + + #endregion + + #region Non-public methods + + /// <summary> + /// Read entry from the reader. + /// </summary> + /// <param name="reader">Reader.</param> + /// <returns>Entry.</returns> + protected abstract T Read(PortableReaderImpl reader); + + /** <inheritdoc /> */ + protected override T1 Unmarshal<T1>(IPortableStream stream) + { + return Marshaller.Unmarshal<T1>(stream, _keepPortable); + } + + /// <summary> + /// Request next batch. + /// </summary> + private void RequestBatch() + { + _batch = DoInOp<T[]>(OpGetBatch, ConvertGetBatch); + + _batchPos = 0; + } + + /// <summary> + /// Converter for GET_ALL operation. + /// </summary> + /// <param name="stream">Portable stream.</param> + /// <returns>Result.</returns> + private IList<T> ConvertGetAll(IPortableStream stream) + { + var reader = Marshaller.StartUnmarshal(stream, _keepPortable); + + var size = reader.ReadInt(); + + var res = new List<T>(size); + + for (var i = 0; i < size; i++) + res.Add(Read(reader)); + + return res; + } + + /// <summary> + /// Converter for GET_BATCH operation. + /// </summary> + /// <param name="stream">Portable stream.</param> + /// <returns>Result.</returns> + private T[] ConvertGetBatch(IPortableStream stream) + { + var reader = Marshaller.StartUnmarshal(stream, _keepPortable); + + var size = reader.ReadInt(); + + if (size == 0) + return null; + + var res = new T[size]; + + for (var i = 0; i < size; i++) + res[i] = Read(reader); + + return res; + } + + #endregion + + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs new file mode 100644 index 0000000..5738ed9 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilter.cs @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous +{ + using Apache.Ignite.Core.Cache.Event; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Resource; + using CQU = ContinuousQueryUtils; + + /// <summary> + /// Continuous query filter interface. Required to hide generic nature of underliyng real filter. + /// </summary> + internal interface IContinuousQueryFilter + { + /// <summary> + /// Evaluate filter. + /// </summary> + /// <param name="stream">Stream.</param> + /// <returns>Result.</returns> + bool Evaluate(IPortableStream stream); + + /// <summary> + /// Inject grid. + /// </summary> + /// <param name="grid"></param> + void Inject(Ignite grid); + + /// <summary> + /// Allocate handle for the filter. + /// </summary> + /// <returns></returns> + long Allocate(); + + /// <summary> + /// Release filter. + /// </summary> + void Release(); + } + + /// <summary> + /// Continuous query filter generic implementation. + /// </summary> + internal class ContinuousQueryFilter<TK, TV> : IContinuousQueryFilter + { + /** Actual filter. */ + private readonly ICacheEntryEventFilter<TK, TV> _filter; + + /** Keep portable flag. */ + private readonly bool _keepPortable; + + /** Ignite hosting the filter. */ + private volatile Ignite _ignite; + + /** GC handle. */ + private long? _hnd; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="filter">Actual filter.</param> + /// <param name="keepPortable">Keep portable flag.</param> + public ContinuousQueryFilter(ICacheEntryEventFilter<TK, TV> filter, bool keepPortable) + { + _filter = filter; + _keepPortable = keepPortable; + } + + /** <inheritDoc /> */ + public bool Evaluate(IPortableStream stream) + { + ICacheEntryEvent<TK, TV> evt = CQU.ReadEvent<TK, TV>(stream, _ignite.Marshaller, _keepPortable); + + return _filter.Evaluate(evt); + } + + /** <inheritDoc /> */ + public void Inject(Ignite grid) + { + _ignite = grid; + + ResourceProcessor.Inject(_filter, grid); + } + + /** <inheritDoc /> */ + public long Allocate() + { + lock (this) + { + if (!_hnd.HasValue) + _hnd = _ignite.HandleRegistry.Allocate(this); + + return _hnd.Value; + } + } + + /** <inheritDoc /> */ + public void Release() + { + lock (this) + { + if (_hnd.HasValue) + { + _ignite.HandleRegistry.Release(_hnd.Value); + + _hnd = null; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs new file mode 100644 index 0000000..65da674 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryFilterHolder.cs @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous +{ + using System; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Continuous query remote filter holder. Wraps real filter into portable object, + /// so that it can be passed over wire to another node. + /// </summary> + public class ContinuousQueryFilterHolder : IPortableWriteAware + { + /** Key type. */ + private readonly Type _keyTyp; + + /** Value type. */ + private readonly Type _valTyp; + + /** Filter object. */ + private readonly object _filter; + + /** Keep portable flag. */ + private readonly bool _keepPortable; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="keyTyp">Key type.</param> + /// <param name="valTyp">Value type.</param> + /// <param name="filter">Filter.</param> + /// <param name="keepPortable">Keep portable flag.</param> + public ContinuousQueryFilterHolder(Type keyTyp, Type valTyp, object filter, bool keepPortable) + { + _keyTyp = keyTyp; + _valTyp = valTyp; + _filter = filter; + _keepPortable = keepPortable; + } + + /// <summary> + /// Key type. + /// </summary> + internal Type KeyType + { + get { return _keyTyp; } + } + + /// <summary> + /// Value type. + /// </summary> + internal Type ValueType + { + get { return _valTyp; } + } + + /// <summary> + /// Filter. + /// </summary> + internal object Filter + { + get { return _filter; } + } + + /// <summary> + /// Keep portable flag. + /// </summary> + internal bool KeepPortable + { + get { return _keepPortable; } + } + + /// <summary> + /// Writes this object to the given writer. + /// </summary> + /// <param name="writer">Writer.</param> + public void WritePortable(IPortableWriter writer) + { + PortableWriterImpl rawWriter = (PortableWriterImpl) writer.RawWriter(); + + PortableUtils.WritePortableOrSerializable(rawWriter, _keyTyp); + PortableUtils.WritePortableOrSerializable(rawWriter, _valTyp); + PortableUtils.WritePortableOrSerializable(rawWriter, _filter); + + rawWriter.WriteBoolean(_keepPortable); + } + + /// <summary> + /// Initializes a new instance of the <see cref="ContinuousQueryFilterHolder"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public ContinuousQueryFilterHolder(IPortableReader reader) + { + PortableReaderImpl rawReader = (PortableReaderImpl) reader.RawReader(); + + _keyTyp = PortableUtils.ReadPortableOrSerializable<Type>(rawReader); + _valTyp = PortableUtils.ReadPortableOrSerializable<Type>(rawReader); + _filter = PortableUtils.ReadPortableOrSerializable<object>(rawReader); + _keepPortable = rawReader.ReadBoolean(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs new file mode 100644 index 0000000..7a1b544 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous +{ + using System; + using System.Diagnostics; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Event; + using Apache.Ignite.Core.Cache.Query; + using Apache.Ignite.Core.Cache.Query.Continuous; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Impl.Unmanaged; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + using CQU = ContinuousQueryUtils; + + /// <summary> + /// Continuous query handle interface. + /// </summary> + internal interface IContinuousQueryHandleImpl : IDisposable + { + /// <summary> + /// Process callback. + /// </summary> + /// <param name="stream">Stream.</param> + /// <returns>Result.</returns> + void Apply(IPortableStream stream); + } + + /// <summary> + /// Continuous query handle. + /// </summary> + internal class ContinuousQueryHandleImpl<TK, TV> : IContinuousQueryHandleImpl, IContinuousQueryFilter, + IContinuousQueryHandle<ICacheEntry<TK, TV>> + { + /** Marshaller. */ + private readonly PortableMarshaller _marsh; + + /** Keep portable flag. */ + private readonly bool _keepPortable; + + /** Real listener. */ + private readonly ICacheEntryEventListener<TK, TV> _lsnr; + + /** Real filter. */ + private readonly ICacheEntryEventFilter<TK, TV> _filter; + + /** GC handle. */ + private long _hnd; + + /** Native query. */ + private volatile IUnmanagedTarget _nativeQry; + + /** Initial query cursor. */ + private volatile IQueryCursor<ICacheEntry<TK, TV>> _initialQueryCursor; + + /** Disposed flag. */ + private bool _disposed; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="qry">Query.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="keepPortable">Keep portable flag.</param> + public ContinuousQueryHandleImpl(ContinuousQuery<TK, TV> qry, PortableMarshaller marsh, bool keepPortable) + { + _marsh = marsh; + _keepPortable = keepPortable; + + _lsnr = qry.Listener; + _filter = qry.Filter; + } + + /// <summary> + /// Start execution. + /// </summary> + /// <param name="grid">Ignite instance.</param> + /// <param name="writer">Writer.</param> + /// <param name="cb">Callback invoked when all necessary data is written to stream.</param> + /// <param name="qry">Query.</param> + public void Start(Ignite grid, PortableWriterImpl writer, Func<IUnmanagedTarget> cb, + ContinuousQuery<TK, TV> qry) + { + // 1. Inject resources. + ResourceProcessor.Inject(_lsnr, grid); + ResourceProcessor.Inject(_filter, grid); + + // 2. Allocate handle. + _hnd = grid.HandleRegistry.Allocate(this); + + // 3. Write data to stream. + writer.WriteLong(_hnd); + writer.WriteBoolean(qry.Local); + writer.WriteBoolean(_filter != null); + + ContinuousQueryFilterHolder filterHolder = _filter == null || qry.Local ? null : + new ContinuousQueryFilterHolder(typeof (TK), typeof (TV), _filter, _keepPortable); + + writer.WriteObject(filterHolder); + + writer.WriteInt(qry.BufferSize); + writer.WriteLong((long)qry.TimeInterval.TotalMilliseconds); + writer.WriteBoolean(qry.AutoUnsubscribe); + + // 4. Call Java. + _nativeQry = cb(); + + // 5. Initial query. + var nativeInitialQryCur = UU.ContinuousQueryGetInitialQueryCursor(_nativeQry); + _initialQueryCursor = nativeInitialQryCur == null + ? null + : new QueryCursor<TK, TV>(nativeInitialQryCur, _marsh, _keepPortable); + } + + /** <inheritdoc /> */ + public void Apply(IPortableStream stream) + { + ICacheEntryEvent<TK, TV>[] evts = CQU.ReadEvents<TK, TV>(stream, _marsh, _keepPortable); + + _lsnr.OnEvent(evts); + } + + /** <inheritdoc /> */ + public bool Evaluate(IPortableStream stream) + { + Debug.Assert(_filter != null, "Evaluate should not be called if filter is not set."); + + ICacheEntryEvent<TK, TV> evt = CQU.ReadEvent<TK, TV>(stream, _marsh, _keepPortable); + + return _filter.Evaluate(evt); + } + + /** <inheritdoc /> */ + public void Inject(Ignite grid) + { + throw new NotSupportedException("Should not be called."); + } + + /** <inheritdoc /> */ + public long Allocate() + { + throw new NotSupportedException("Should not be called."); + } + + /** <inheritdoc /> */ + public void Release() + { + _marsh.Ignite.HandleRegistry.Release(_hnd); + } + + /** <inheritdoc /> */ + public IQueryCursor<ICacheEntry<TK, TV>> InitialQueryCursor + { + get { return GetInitialQueryCursor(); } + } + + /** <inheritdoc /> */ + public IQueryCursor<ICacheEntry<TK, TV>> GetInitialQueryCursor() + { + lock (this) + { + if (_disposed) + throw new ObjectDisposedException("Continuous query handle has been disposed."); + + var cur = _initialQueryCursor; + + if (cur == null) + throw new InvalidOperationException("GetInitialQueryCursor() can be called only once."); + + _initialQueryCursor = null; + + return cur; + } + } + + /** <inheritdoc /> */ + public void Dispose() + { + lock (this) + { + if (_disposed) + return; + + Debug.Assert(_nativeQry != null); + + try + { + UU.ContinuousQueryClose(_nativeQry); + } + finally + { + _nativeQry.Dispose(); + + _disposed = true; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs new file mode 100644 index 0000000..86c8300 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous +{ + using System.Diagnostics; + using System.Diagnostics.CodeAnalysis; + using Apache.Ignite.Core.Cache.Event; + using Apache.Ignite.Core.Impl.Cache.Event; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + + /// <summary> + /// Utility methods for continuous queries. + /// </summary> + static class ContinuousQueryUtils + { + /// <summary> + /// Read single event. + /// </summary> + /// <param name="stream">Stream to read data from.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="keepPortable">Keep portable flag.</param> + /// <returns>Event.</returns> + public static ICacheEntryEvent<TK, TV> ReadEvent<TK, TV>(IPortableStream stream, + PortableMarshaller marsh, bool keepPortable) + { + var reader = marsh.StartUnmarshal(stream, keepPortable); + + return ReadEvent0<TK, TV>(reader); + } + + /// <summary> + /// Read multiple events. + /// </summary> + /// <param name="stream">Stream.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="keepPortable">Keep portable flag.</param> + /// <returns>Events.</returns> + [SuppressMessage("ReSharper", "PossibleNullReferenceException")] + public static ICacheEntryEvent<TK, TV>[] ReadEvents<TK, TV>(IPortableStream stream, + PortableMarshaller marsh, bool keepPortable) + { + var reader = marsh.StartUnmarshal(stream, keepPortable); + + int cnt = reader.ReadInt(); + + ICacheEntryEvent<TK, TV>[] evts = new ICacheEntryEvent<TK, TV>[cnt]; + + for (int i = 0; i < cnt; i++) + evts[i] = ReadEvent0<TK, TV>(reader); + + return evts; + } + + /// <summary> + /// Read event. + /// </summary> + /// <param name="reader">Reader.</param> + /// <returns>Event.</returns> + private static ICacheEntryEvent<TK, TV> ReadEvent0<TK, TV>(PortableReaderImpl reader) + { + reader.DetachNext(); + TK key = reader.ReadObject<TK>(); + + reader.DetachNext(); + TV oldVal = reader.ReadObject<TV>(); + + reader.DetachNext(); + TV val = reader.ReadObject<TV>(); + + return CreateEvent(key, oldVal, val); + } + + /// <summary> + /// Create event. + /// </summary> + /// <param name="key">Key.</param> + /// <param name="oldVal">Old value.</param> + /// <param name="val">Value.</param> + /// <returns>Event.</returns> + public static ICacheEntryEvent<TK, TV> CreateEvent<TK, TV>(TK key, TV oldVal, TV val) + { + if (oldVal == null) + { + Debug.Assert(val != null); + + return new CacheEntryCreateEvent<TK, TV>(key, val); + } + + if (val == null) + { + Debug.Assert(oldVal != null); + + return new CacheEntryRemoveEvent<TK, TV>(key, oldVal); + } + + return new CacheEntryUpdateEvent<TK, TV>(key, oldVal, val); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs new file mode 100644 index 0000000..f38346c --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Query +{ + using System.Collections; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Unmanaged; + + /// <summary> + /// Cursor for entry-based queries. + /// </summary> + internal class FieldsQueryCursor : AbstractQueryCursor<IList> + { + /// <summary> + /// Constructor. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaler.</param> + /// <param name="keepPortable">Keep poratble flag.</param> + public FieldsQueryCursor(IUnmanagedTarget target, PortableMarshaller marsh, bool keepPortable) + : base(target, marsh, keepPortable) + { + // No-op. + } + + /** <inheritdoc /> */ + protected override IList Read(PortableReaderImpl reader) + { + int cnt = reader.ReadInt(); + + var res = new ArrayList(cnt); + + for (int i = 0; i < cnt; i++) + res.Add(reader.ReadObject<object>()); + + return res; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs new file mode 100644 index 0000000..0b113f5 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Query +{ + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Unmanaged; + + /// <summary> + /// Cursor for entry-based queries. + /// </summary> + internal class QueryCursor<TK, TV> : AbstractQueryCursor<ICacheEntry<TK, TV>> + { + /// <summary> + /// Constructor. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaler.</param> + /// <param name="keepPortable">Keep poratble flag.</param> + public QueryCursor(IUnmanagedTarget target, PortableMarshaller marsh, + bool keepPortable) : base(target, marsh, keepPortable) + { + // No-op. + } + + /** <inheritdoc /> */ + protected override ICacheEntry<TK, TV> Read(PortableReaderImpl reader) + { + TK key = reader.ReadObject<TK>(); + TV val = reader.ReadObject<TV>(); + + return new CacheEntry<TK, TV>(key, val); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs new file mode 100644 index 0000000..3fbc705 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStore.cs @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Store +{ + using System.Collections; + using System.Diagnostics; + using Apache.Ignite.Core.Cache.Store; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Handle; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Resource; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Interop cache store. + /// </summary> + internal class CacheStore + { + /** */ + private const byte OpLoadCache = 0; + + /** */ + private const byte OpLoad = 1; + + /** */ + private const byte OpLoadAll = 2; + + /** */ + private const byte OpPut = 3; + + /** */ + private const byte OpPutAll = 4; + + /** */ + private const byte OpRmv = 5; + + /** */ + private const byte OpRmvAll = 6; + + /** */ + private const byte OpSesEnd = 7; + + /** */ + private readonly bool _convertPortable; + + /** Store. */ + private readonly ICacheStore _store; + + /** Session. */ + private readonly CacheStoreSessionProxy _sesProxy; + + /** */ + private readonly long _handle; + + /// <summary> + /// Initializes a new instance of the <see cref="CacheStore" /> class. + /// </summary> + /// <param name="store">Store.</param> + /// <param name="convertPortable">Whether to convert portable objects.</param> + /// <param name="registry">The handle registry.</param> + private CacheStore(ICacheStore store, bool convertPortable, HandleRegistry registry) + { + Debug.Assert(store != null); + + _store = store; + _convertPortable = convertPortable; + + _sesProxy = new CacheStoreSessionProxy(); + + ResourceProcessor.InjectStoreSession(store, _sesProxy); + + _handle = registry.AllocateCritical(this); + } + + /// <summary> + /// Creates interop cache store from a stream. + /// </summary> + /// <param name="memPtr">Memory pointer.</param> + /// <param name="registry">The handle registry.</param> + /// <returns> + /// Interop cache store. + /// </returns> + internal static CacheStore CreateInstance(long memPtr, HandleRegistry registry) + { + using (var stream = IgniteManager.Memory.Get(memPtr).Stream()) + { + var reader = PortableUtils.Marshaller.StartUnmarshal(stream, PortableMode.KeepPortable); + + var assemblyName = reader.ReadString(); + var className = reader.ReadString(); + var convertPortable = reader.ReadBoolean(); + var propertyMap = reader.ReadGenericDictionary<string, object>(); + + var store = (ICacheStore) IgniteUtils.CreateInstance(assemblyName, className); + + IgniteUtils.SetProperties(store, propertyMap); + + return new CacheStore(store, convertPortable, registry); + } + } + + /// <summary> + /// Gets the handle. + /// </summary> + public long Handle + { + get { return _handle; } + } + + /// <summary> + /// Initializes this instance with a grid. + /// </summary> + /// <param name="grid">Grid.</param> + public void Init(Ignite grid) + { + ResourceProcessor.Inject(_store, grid); + } + + /// <summary> + /// Invokes a store operation. + /// </summary> + /// <param name="input">Input stream.</param> + /// <param name="cb">Callback.</param> + /// <param name="grid">Grid.</param> + /// <returns>Invocation result.</returns> + /// <exception cref="IgniteException">Invalid operation type: + opType</exception> + public int Invoke(IPortableStream input, IUnmanagedTarget cb, Ignite grid) + { + IPortableReader reader = grid.Marshaller.StartUnmarshal(input, + _convertPortable ? PortableMode.Deserialize : PortableMode.ForcePortable); + + IPortableRawReader rawReader = reader.RawReader(); + + int opType = rawReader.ReadByte(); + + // Setup cache sessoin for this invocation. + long sesId = rawReader.ReadLong(); + + CacheStoreSession ses = grid.HandleRegistry.Get<CacheStoreSession>(sesId, true); + + ses.CacheName = rawReader.ReadString(); + + _sesProxy.SetSession(ses); + + try + { + // Perform operation. + switch (opType) + { + case OpLoadCache: + _store.LoadCache((k, v) => WriteObjects(cb, grid, k, v), rawReader.ReadObjectArray<object>()); + + break; + + case OpLoad: + object val = _store.Load(rawReader.ReadObject<object>()); + + if (val != null) + WriteObjects(cb, grid, val); + + break; + + case OpLoadAll: + var keys = rawReader.ReadCollection(); + + var result = _store.LoadAll(keys); + + foreach (DictionaryEntry entry in result) + WriteObjects(cb, grid, entry.Key, entry.Value); + + break; + + case OpPut: + _store.Write(rawReader.ReadObject<object>(), rawReader.ReadObject<object>()); + + break; + + case OpPutAll: + _store.WriteAll(rawReader.ReadDictionary()); + + break; + + case OpRmv: + _store.Delete(rawReader.ReadObject<object>()); + + break; + + case OpRmvAll: + _store.DeleteAll(rawReader.ReadCollection()); + + break; + + case OpSesEnd: + grid.HandleRegistry.Release(sesId); + + _store.SessionEnd(rawReader.ReadBoolean()); + + break; + + default: + throw new IgniteException("Invalid operation type: " + opType); + } + + return 0; + } + finally + { + _sesProxy.ClearSession(); + } + } + + /// <summary> + /// Writes objects to the marshaller. + /// </summary> + /// <param name="cb">Optional callback.</param> + /// <param name="grid">Grid.</param> + /// <param name="objects">Objects.</param> + private static void WriteObjects(IUnmanagedTarget cb, Ignite grid, params object[] objects) + { + using (var stream = IgniteManager.Memory.Allocate().Stream()) + { + PortableWriterImpl writer = grid.Marshaller.StartMarshal(stream); + + try + { + foreach (var obj in objects) + { + writer.DetachNext(); + writer.WriteObject(obj); + } + } + finally + { + grid.Marshaller.FinishMarshal(writer); + } + + if (cb != null) + { + stream.SynchronizeOutput(); + + UnmanagedUtils.CacheStoreCallbackInvoke(cb, stream.MemoryPointer); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs new file mode 100644 index 0000000..f771fe8 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSession.cs @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Store +{ + using System.Collections.Generic; + using Apache.Ignite.Core.Cache.Store; + + /// <summary> + /// Store session implementation. + /// </summary> + internal class CacheStoreSession : ICacheStoreSession + { + /** Properties. */ + private IDictionary<object, object> _props; + + /** <inheritdoc /> */ + + public string CacheName + { + get; internal set; + } + + /** <inheritdoc /> */ + public IDictionary<object, object> Properties + { + get { return _props ?? (_props = new Dictionary<object, object>(2)); } + } + + /// <summary> + /// Clear session state. + /// </summary> + public void Clear() + { + if (_props != null) + _props.Clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs new file mode 100644 index 0000000..3dd7354 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cache/Store/CacheStoreSessionProxy.cs @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Store +{ + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using System.Threading; + using Apache.Ignite.Core.Cache.Store; + + /// <summary> + /// Store session proxy. + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] + internal class CacheStoreSessionProxy : ICacheStoreSession + { + /** Session. */ + private readonly ThreadLocal<CacheStoreSession> _target = new ThreadLocal<CacheStoreSession>(); + + /** <inheritdoc /> */ + public string CacheName + { + get { return _target.Value.CacheName; } + } + + /** <inheritdoc /> */ + public IDictionary<object, object> Properties + { + get { return _target.Value.Properties; } + } + + /// <summary> + /// Set thread-bound session. + /// </summary> + /// <param name="ses">Session.</param> + internal void SetSession(CacheStoreSession ses) + { + _target.Value = ses; + } + + /// <summary> + /// Clear thread-bound session. + /// </summary> + internal void ClearSession() + { + _target.Value = null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs new file mode 100644 index 0000000..d26f52e --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cluster +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Diagnostics.CodeAnalysis; + using System.Linq; + using System.Threading; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Events; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Compute; + using Apache.Ignite.Core.Impl.Events; + using Apache.Ignite.Core.Impl.Messaging; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.Metadata; + using Apache.Ignite.Core.Impl.Services; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Messaging; + using Apache.Ignite.Core.Portable; + using Apache.Ignite.Core.Services; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Ignite projection implementation. + /// </summary> + internal class ClusterGroupImpl : PlatformTarget, IClusterGroupEx + { + /** Attribute: platform. */ + private const string AttrPlatform = "org.apache.ignite.platform"; + + /** Platform. */ + private const string Platform = "dotnet"; + + /** Initial topver; invalid from Java perspective, so update will be triggered when this value is met. */ + private const int TopVerInit = 0; + + /** */ + private const int OpAllMetadata = 1; + + /** */ + private const int OpForAttribute = 2; + + /** */ + private const int OpForCache = 3; + + /** */ + private const int OpForClient = 4; + + /** */ + private const int OpForData = 5; + + /** */ + private const int OpForHost = 6; + + /** */ + private const int OpForNodeIds = 7; + + /** */ + private const int OpMetadata = 8; + + /** */ + private const int OpMetrics = 9; + + /** */ + private const int OpMetricsFiltered = 10; + + /** */ + private const int OpNodeMetrics = 11; + + /** */ + private const int OpNodes = 12; + + /** */ + private const int OpPingNode = 13; + + /** */ + private const int OpTopology = 14; + + /** Initial Ignite instance. */ + private readonly Ignite _ignite; + + /** Predicate. */ + private readonly Func<IClusterNode, bool> _pred; + + /** Topology version. */ + [SuppressMessage("Microsoft.Performance", "CA1805:DoNotInitializeUnnecessarily")] + private long _topVer = TopVerInit; + + /** Nodes for the given topology version. */ + private volatile IList<IClusterNode> _nodes; + + /** Processor. */ + private readonly IUnmanagedTarget _proc; + + /** Compute. */ + private readonly Lazy<Compute> _comp; + + /** Messaging. */ + private readonly Lazy<Messaging> _msg; + + /** Events. */ + private readonly Lazy<Events> _events; + + /** Services. */ + private readonly Lazy<IServices> _services; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="proc">Processor.</param> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="ignite">Grid.</param> + /// <param name="pred">Predicate.</param> + public ClusterGroupImpl(IUnmanagedTarget proc, IUnmanagedTarget target, PortableMarshaller marsh, + Ignite ignite, Func<IClusterNode, bool> pred) + : base(target, marsh) + { + _proc = proc; + _ignite = ignite; + _pred = pred; + + _comp = new Lazy<Compute>(() => + new Compute(new ComputeImpl(UU.ProcessorCompute(proc, target), marsh, this, false))); + + _msg = new Lazy<Messaging>(() => new Messaging(UU.ProcessorMessage(proc, target), marsh, this)); + + _events = new Lazy<Events>(() => new Events(UU.ProcessorEvents(proc, target), marsh, this)); + + _services = new Lazy<IServices>(() => + new Services(UU.ProcessorServices(proc, target), marsh, this, false, false)); + } + + /** <inheritDoc /> */ + public IIgnite Ignite + { + get { return _ignite; } + } + + /** <inheritDoc /> */ + public ICompute Compute() + { + return _comp.Value; + } + + /** <inheritDoc /> */ + public IClusterGroup ForNodes(IEnumerable<IClusterNode> nodes) + { + IgniteArgumentCheck.NotNull(nodes, "nodes"); + + return ForNodeIds0(nodes, node => node.Id); + } + + /** <inheritDoc /> */ + public IClusterGroup ForNodes(params IClusterNode[] nodes) + { + IgniteArgumentCheck.NotNull(nodes, "nodes"); + + return ForNodeIds0(nodes, node => node.Id); + } + + /** <inheritDoc /> */ + public IClusterGroup ForNodeIds(IEnumerable<Guid> ids) + { + IgniteArgumentCheck.NotNull(ids, "ids"); + + return ForNodeIds0(ids, null); + } + + /** <inheritDoc /> */ + public IClusterGroup ForNodeIds(params Guid[] ids) + { + IgniteArgumentCheck.NotNull(ids, "ids"); + + return ForNodeIds0(ids, null); + } + + /// <summary> + /// Internal routine to get projection for specific node IDs. + /// </summary> + /// <param name="items">Items.</param> + /// <param name="func">Function to transform item to Guid (optional).</param> + /// <returns></returns> + private IClusterGroup ForNodeIds0<T>(IEnumerable<T> items, Func<T, Guid> func) + { + Debug.Assert(items != null); + + IUnmanagedTarget prj = DoProjetionOutOp(OpForNodeIds, writer => + { + WriteEnumerable(writer, items, func); + }); + + return GetClusterGroup(prj); + } + + /** <inheritDoc /> */ + public IClusterGroup ForPredicate(Func<IClusterNode, bool> p) + { + var newPred = _pred == null ? p : node => _pred(node) && p(node); + + return new ClusterGroupImpl(_proc, Target, Marshaller, _ignite, newPred); + } + + /** <inheritDoc /> */ + public IClusterGroup ForAttribute(string name, string val) + { + IgniteArgumentCheck.NotNull(name, "name"); + + IUnmanagedTarget prj = DoProjetionOutOp(OpForAttribute, writer => + { + writer.WriteString(name); + writer.WriteString(val); + }); + + return GetClusterGroup(prj); + } + + /// <summary> + /// Creates projection with a specified op. + /// </summary> + /// <param name="name">Cache name to include into projection.</param> + /// <param name="op">Operation id.</param> + /// <returns> + /// Projection over nodes that have specified cache running. + /// </returns> + private IClusterGroup ForCacheNodes(string name, int op) + { + IUnmanagedTarget prj = DoProjetionOutOp(op, writer => + { + writer.WriteString(name); + }); + + return GetClusterGroup(prj); + } + + /** <inheritDoc /> */ + public IClusterGroup ForCacheNodes(string name) + { + return ForCacheNodes(name, OpForCache); + } + + /** <inheritDoc /> */ + public IClusterGroup ForDataNodes(string name) + { + return ForCacheNodes(name, OpForData); + } + + /** <inheritDoc /> */ + public IClusterGroup ForClientNodes(string name) + { + return ForCacheNodes(name, OpForClient); + } + + /** <inheritDoc /> */ + public IClusterGroup ForRemotes() + { + return GetClusterGroup(UU.ProjectionForRemotes(Target)); + } + + /** <inheritDoc /> */ + public IClusterGroup ForHost(IClusterNode node) + { + IgniteArgumentCheck.NotNull(node, "node"); + + IUnmanagedTarget prj = DoProjetionOutOp(OpForHost, writer => + { + writer.WriteGuid(node.Id); + }); + + return GetClusterGroup(prj); + } + + /** <inheritDoc /> */ + public IClusterGroup ForRandom() + { + return GetClusterGroup(UU.ProjectionForRandom(Target)); + } + + /** <inheritDoc /> */ + public IClusterGroup ForOldest() + { + return GetClusterGroup(UU.ProjectionForOldest(Target)); + } + + /** <inheritDoc /> */ + public IClusterGroup ForYoungest() + { + return GetClusterGroup(UU.ProjectionForYoungest(Target)); + } + + /** <inheritDoc /> */ + public IClusterGroup ForDotNet() + { + return ForAttribute(AttrPlatform, Platform); + } + + /** <inheritDoc /> */ + public ICollection<IClusterNode> Nodes() + { + return RefreshNodes(); + } + + /** <inheritDoc /> */ + public IClusterNode Node(Guid id) + { + return Nodes().FirstOrDefault(node => node.Id == id); + } + + /** <inheritDoc /> */ + public IClusterNode Node() + { + return Nodes().FirstOrDefault(); + } + + /** <inheritDoc /> */ + public IClusterMetrics Metrics() + { + if (_pred == null) + { + return DoInOp(OpMetrics, stream => + { + IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false); + + return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null; + }); + } + return DoOutInOp(OpMetricsFiltered, writer => + { + WriteEnumerable(writer, Nodes().Select(node => node.Id)); + }, stream => + { + IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false); + + return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null; + }); + } + + /** <inheritDoc /> */ + public IMessaging Message() + { + return _msg.Value; + } + + /** <inheritDoc /> */ + public IEvents Events() + { + return _events.Value; + } + + /** <inheritDoc /> */ + public IServices Services() + { + return _services.Value; + } + + /// <summary> + /// Pings a remote node. + /// </summary> + /// <param name="nodeId">ID of a node to ping.</param> + /// <returns>True if node for a given ID is alive, false otherwise.</returns> + internal bool PingNode(Guid nodeId) + { + return DoOutOp(OpPingNode, nodeId) == True; + } + + /// <summary> + /// Predicate (if any). + /// </summary> + public Func<IClusterNode, bool> Predicate + { + get { return _pred; } + } + + /// <summary> + /// Refresh cluster node metrics. + /// </summary> + /// <param name="nodeId">Node</param> + /// <param name="lastUpdateTime"></param> + /// <returns></returns> + internal ClusterMetricsImpl RefreshClusterNodeMetrics(Guid nodeId, long lastUpdateTime) + { + return DoOutInOp(OpNodeMetrics, writer => + { + writer.WriteGuid(nodeId); + writer.WriteLong(lastUpdateTime); + }, stream => + { + IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false); + + return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null; + } + ); + } + + /// <summary> + /// Gets a topology by version. Returns null if topology history storage doesn't contain + /// specified topology version (history currently keeps the last 1000 snapshots). + /// </summary> + /// <param name="version">Topology version.</param> + /// <returns>Collection of Ignite nodes which represented by specified topology version, + /// if it is present in history storage, {@code null} otherwise.</returns> + /// <exception cref="IgniteException">If underlying SPI implementation does not support + /// topology history. Currently only {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} + /// supports topology history.</exception> + internal ICollection<IClusterNode> Topology(long version) + { + return DoOutInOp(OpTopology, writer => writer.WriteLong(version), + input => IgniteUtils.ReadNodes(Marshaller.StartUnmarshal(input))); + } + + /// <summary> + /// Topology version. + /// </summary> + internal long TopologyVersion + { + get + { + RefreshNodes(); + + return Interlocked.Read(ref _topVer); + } + } + + /// <summary> + /// Update topology. + /// </summary> + /// <param name="newTopVer">New topology version.</param> + /// <param name="newNodes">New nodes.</param> + internal void UpdateTopology(long newTopVer, List<IClusterNode> newNodes) + { + lock (this) + { + // If another thread already advanced topology version further, we still + // can safely return currently received nodes, but we will not assign them. + if (_topVer < newTopVer) + { + Interlocked.Exchange(ref _topVer, newTopVer); + + _nodes = newNodes.AsReadOnly(); + } + } + } + + /// <summary> + /// Get current nodes without refreshing the topology. + /// </summary> + /// <returns>Current nodes.</returns> + internal IList<IClusterNode> NodesNoRefresh() + { + return _nodes; + } + + /// <summary> + /// Creates new Cluster Group from given native projection. + /// </summary> + /// <param name="prj">Native projection.</param> + /// <returns>New cluster group.</returns> + private IClusterGroup GetClusterGroup(IUnmanagedTarget prj) + { + return new ClusterGroupImpl(_proc, prj, Marshaller, _ignite, _pred); + } + + /// <summary> + /// Refresh projection nodes. + /// </summary> + /// <returns>Nodes.</returns> + private IList<IClusterNode> RefreshNodes() + { + long oldTopVer = Interlocked.Read(ref _topVer); + + List<IClusterNode> newNodes = null; + + DoOutInOp(OpNodes, writer => + { + writer.WriteLong(oldTopVer); + }, input => + { + PortableReaderImpl reader = Marshaller.StartUnmarshal(input); + + if (reader.ReadBoolean()) + { + // Topology has been updated. + long newTopVer = reader.ReadLong(); + + newNodes = IgniteUtils.ReadNodes(reader, _pred); + + UpdateTopology(newTopVer, newNodes); + } + }); + + if (newNodes != null) + return newNodes; + + // No topology changes. + Debug.Assert(_nodes != null, "At least one topology update should have occurred."); + + return _nodes; + } + + /// <summary> + /// Perform synchronous out operation returning value. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="action">Action.</param> + /// <returns>Native projection.</returns> + private IUnmanagedTarget DoProjetionOutOp(int type, Action<PortableWriterImpl> action) + { + using (var stream = IgniteManager.Memory.Allocate().Stream()) + { + var writer = Marshaller.StartMarshal(stream); + + action(writer); + + FinishMarshal(writer); + + return UU.ProjectionOutOpRet(Target, type, stream.SynchronizeOutput()); + } + } + + /** <inheritDoc /> */ + public IPortableMetadata Metadata(int typeId) + { + return DoOutInOp<IPortableMetadata>(OpMetadata, + writer => + { + writer.WriteInt(typeId); + }, + stream => + { + PortableReaderImpl reader = Marshaller.StartUnmarshal(stream, false); + + return reader.ReadBoolean() ? new PortableMetadataImpl(reader) : null; + } + ); + } + + /// <summary> + /// Gets metadata for all known types. + /// </summary> + public List<IPortableMetadata> Metadata() + { + return DoInOp(OpAllMetadata, s => + { + var reader = Marshaller.StartUnmarshal(s); + + var size = reader.ReadInt(); + + var res = new List<IPortableMetadata>(size); + + for (var i = 0; i < size; i++) + res.Add(reader.ReadBoolean() ? new PortableMetadataImpl(reader) : null); + + return res; + }); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs new file mode 100644 index 0000000..664a1f1 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cluster +{ + using System; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Cluster metrics implementation. + /// </summary> + internal class ClusterMetricsImpl : IClusterMetrics + { + /// <summary> + /// Initializes a new instance of the <see cref="ClusterMetricsImpl"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public ClusterMetricsImpl(IPortableRawReader reader) + { + LastUpdateTimeRaw = reader.ReadLong(); + + DateTime? lastUpdateTime0 = reader.ReadDate(); + + LastUpdateTime = lastUpdateTime0 ?? default(DateTime); + MaximumActiveJobs = reader.ReadInt(); + CurrentActiveJobs = reader.ReadInt(); + AverageActiveJobs = reader.ReadFloat(); + MaximumWaitingJobs = reader.ReadInt(); + + CurrentWaitingJobs = reader.ReadInt(); + AverageWaitingJobs = reader.ReadFloat(); + MaximumRejectedJobs = reader.ReadInt(); + CurrentRejectedJobs = reader.ReadInt(); + AverageRejectedJobs = reader.ReadFloat(); + + TotalRejectedJobs = reader.ReadInt(); + MaximumCancelledJobs = reader.ReadInt(); + CurrentCancelledJobs = reader.ReadInt(); + AverageCancelledJobs = reader.ReadFloat(); + TotalCancelledJobs = reader.ReadInt(); + + TotalExecutedJobs = reader.ReadInt(); + MaximumJobWaitTime = reader.ReadLong(); + CurrentJobWaitTime = reader.ReadLong(); + AverageJobWaitTime = reader.ReadDouble(); + MaximumJobExecuteTime = reader.ReadLong(); + + CurrentJobExecuteTime = reader.ReadLong(); + AverageJobExecuteTime = reader.ReadDouble(); + TotalExecutedTasks = reader.ReadInt(); + TotalIdleTime = reader.ReadLong(); + CurrentIdleTime = reader.ReadLong(); + + TotalCpus = reader.ReadInt(); + CurrentCpuLoad = reader.ReadDouble(); + AverageCpuLoad = reader.ReadDouble(); + CurrentGcCpuLoad = reader.ReadDouble(); + HeapMemoryInitialized = reader.ReadLong(); + + HeapMemoryUsed = reader.ReadLong(); + HeapMemoryCommitted = reader.ReadLong(); + HeapMemoryMaximum = reader.ReadLong(); + HeapMemoryTotal = reader.ReadLong(); + NonHeapMemoryInitialized = reader.ReadLong(); + + NonHeapMemoryUsed = reader.ReadLong(); + NonHeapMemoryCommitted = reader.ReadLong(); + NonHeapMemoryMaximum = reader.ReadLong(); + NonHeapMemoryTotal = reader.ReadLong(); + UpTime = reader.ReadLong(); + + DateTime? startTime0 = reader.ReadDate(); + + StartTime = startTime0 ?? default(DateTime); + + DateTime? nodeStartTime0 = reader.ReadDate(); + + NodeStartTime = nodeStartTime0 ?? default(DateTime); + + CurrentThreadCount = reader.ReadInt(); + MaximumThreadCount = reader.ReadInt(); + TotalStartedThreadCount = reader.ReadLong(); + CurrentDaemonThreadCount = reader.ReadInt(); + LastDataVersion = reader.ReadLong(); + + SentMessagesCount = reader.ReadInt(); + SentBytesCount = reader.ReadLong(); + ReceivedMessagesCount = reader.ReadInt(); + ReceivedBytesCount = reader.ReadLong(); + OutboundMessagesQueueSize = reader.ReadInt(); + + TotalNodes = reader.ReadInt(); + } + + /// <summary> + /// Last update time in raw format. + /// </summary> + internal long LastUpdateTimeRaw { get; set; } + + /** <inheritDoc /> */ + public DateTime LastUpdateTime { get; private set; } + + /** <inheritDoc /> */ + public int MaximumActiveJobs { get; private set; } + + /** <inheritDoc /> */ + public int CurrentActiveJobs { get; private set; } + + /** <inheritDoc /> */ + public float AverageActiveJobs { get; private set; } + + /** <inheritDoc /> */ + public int MaximumWaitingJobs { get; private set; } + + /** <inheritDoc /> */ + public int CurrentWaitingJobs { get; private set; } + + /** <inheritDoc /> */ + public float AverageWaitingJobs { get; private set; } + + /** <inheritDoc /> */ + public int MaximumRejectedJobs { get; private set; } + + /** <inheritDoc /> */ + public int CurrentRejectedJobs { get; private set; } + + /** <inheritDoc /> */ + public float AverageRejectedJobs { get; private set; } + + /** <inheritDoc /> */ + public int TotalRejectedJobs { get; private set; } + + /** <inheritDoc /> */ + public int MaximumCancelledJobs { get; private set; } + + /** <inheritDoc /> */ + public int CurrentCancelledJobs { get; private set; } + + /** <inheritDoc /> */ + public float AverageCancelledJobs { get; private set; } + + /** <inheritDoc /> */ + public int TotalCancelledJobs { get; private set; } + + /** <inheritDoc /> */ + public int TotalExecutedJobs { get; private set; } + + /** <inheritDoc /> */ + public long MaximumJobWaitTime { get; private set; } + + /** <inheritDoc /> */ + public long CurrentJobWaitTime { get; private set; } + + /** <inheritDoc /> */ + public double AverageJobWaitTime { get; private set; } + + /** <inheritDoc /> */ + public long MaximumJobExecuteTime { get; private set; } + + /** <inheritDoc /> */ + public long CurrentJobExecuteTime { get; private set; } + + /** <inheritDoc /> */ + public double AverageJobExecuteTime { get; private set; } + + /** <inheritDoc /> */ + public int TotalExecutedTasks { get; private set; } + + /** <inheritDoc /> */ + public long TotalBusyTime + { + get { return UpTime - TotalIdleTime; } + } + + /** <inheritDoc /> */ + public long TotalIdleTime { get; private set; } + + /** <inheritDoc /> */ + public long CurrentIdleTime { get; private set; } + + /** <inheritDoc /> */ + public float BusyTimePercentage + { + get { return 1 - IdleTimePercentage; } + } + + /** <inheritDoc /> */ + public float IdleTimePercentage + { + get { return TotalIdleTime / (float) UpTime; } + } + + /** <inheritDoc /> */ + public int TotalCpus { get; private set; } + + /** <inheritDoc /> */ + public double CurrentCpuLoad { get; private set; } + + /** <inheritDoc /> */ + public double AverageCpuLoad { get; private set; } + + /** <inheritDoc /> */ + public double CurrentGcCpuLoad { get; private set; } + + /** <inheritDoc /> */ + public long HeapMemoryInitialized { get; private set; } + + /** <inheritDoc /> */ + public long HeapMemoryUsed { get; private set; } + + /** <inheritDoc /> */ + public long HeapMemoryCommitted { get; private set; } + + /** <inheritDoc /> */ + public long HeapMemoryMaximum { get; private set; } + + /** <inheritDoc /> */ + public long HeapMemoryTotal { get; private set; } + + /** <inheritDoc /> */ + public long NonHeapMemoryInitialized { get; private set; } + + /** <inheritDoc /> */ + public long NonHeapMemoryUsed { get; private set; } + + /** <inheritDoc /> */ + public long NonHeapMemoryCommitted { get; private set; } + + /** <inheritDoc /> */ + public long NonHeapMemoryMaximum { get; private set; } + + /** <inheritDoc /> */ + public long NonHeapMemoryTotal { get; private set; } + + /** <inheritDoc /> */ + public long UpTime { get; private set; } + + /** <inheritDoc /> */ + public DateTime StartTime { get; private set; } + + /** <inheritDoc /> */ + public DateTime NodeStartTime { get; private set; } + + /** <inheritDoc /> */ + public int CurrentThreadCount { get; private set; } + + /** <inheritDoc /> */ + public int MaximumThreadCount { get; private set; } + + /** <inheritDoc /> */ + public long TotalStartedThreadCount { get; private set; } + + /** <inheritDoc /> */ + public int CurrentDaemonThreadCount { get; private set; } + + /** <inheritDoc /> */ + public long LastDataVersion { get; private set; } + + /** <inheritDoc /> */ + public int SentMessagesCount { get; private set; } + + /** <inheritDoc /> */ + public long SentBytesCount { get; private set; } + + /** <inheritDoc /> */ + public int ReceivedMessagesCount { get; private set; } + + /** <inheritDoc /> */ + public long ReceivedBytesCount { get; private set; } + + /** <inheritDoc /> */ + public int OutboundMessagesQueueSize { get; private set; } + + /** <inheritDoc /> */ + public int TotalNodes { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs new file mode 100644 index 0000000..59373a2 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cluster +{ + using System; + using System.Collections.Generic; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Impl.Collections; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Cluster node implementation. + /// </summary> + internal class ClusterNodeImpl : IClusterNode + { + /** Node ID. */ + private readonly Guid _id; + + /** Attributes. */ + private readonly IDictionary<string, object> _attrs; + + /** Addresses. */ + private readonly ICollection<string> _addrs; + + /** Hosts. */ + private readonly ICollection<string> _hosts; + + /** Order. */ + private readonly long _order; + + /** Local flag. */ + private readonly bool _local; + + /** Daemon flag. */ + private readonly bool _daemon; + + /** Metrics. */ + private volatile ClusterMetricsImpl _metrics; + + /** Ignite reference. */ + private WeakReference _igniteRef; + + /// <summary> + /// Initializes a new instance of the <see cref="ClusterNodeImpl"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public ClusterNodeImpl(IPortableRawReader reader) + { + _id = reader.ReadGuid() ?? default(Guid); + + _attrs = reader.ReadGenericDictionary<string, object>().AsReadOnly(); + _addrs = reader.ReadGenericCollection<string>().AsReadOnly(); + _hosts = reader.ReadGenericCollection<string>().AsReadOnly(); + _order = reader.ReadLong(); + _local = reader.ReadBoolean(); + _daemon = reader.ReadBoolean(); + + _metrics = reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null; + } + + /** <inheritDoc /> */ + public Guid Id + { + get { return _id; } + } + + /** <inheritDoc /> */ + public T Attribute<T>(string name) + { + IgniteArgumentCheck.NotNull(name, "name"); + + return (T)_attrs[name]; + } + + /** <inheritDoc /> */ + public bool TryGetAttribute<T>(string name, out T attr) + { + IgniteArgumentCheck.NotNull(name, "name"); + + object val; + + if (_attrs.TryGetValue(name, out val)) + { + attr = (T)val; + + return true; + } + attr = default(T); + + return false; + } + + /** <inheritDoc /> */ + public IDictionary<string, object> Attributes() + { + return _attrs; + } + + /** <inheritDoc /> */ + public ICollection<string> Addresses + { + get + { + return _addrs; + } + } + + /** <inheritDoc /> */ + public ICollection<string> HostNames + { + get + { + return _hosts; + } + } + + /** <inheritDoc /> */ + public long Order + { + get + { + return _order; + } + } + + /** <inheritDoc /> */ + public bool IsLocal + { + get + { + return _local; + } + } + + /** <inheritDoc /> */ + public bool IsDaemon + { + get + { + return _daemon; + } + } + + /** <inheritDoc /> */ + public IClusterMetrics Metrics() + { + var ignite = (Ignite)_igniteRef.Target; + + if (ignite == null) + return _metrics; + + ClusterMetricsImpl oldMetrics = _metrics; + + long lastUpdateTime = oldMetrics.LastUpdateTimeRaw; + + ClusterMetricsImpl newMetrics = ignite.ClusterGroup.RefreshClusterNodeMetrics(_id, lastUpdateTime); + + if (newMetrics != null) + { + lock (this) + { + if (_metrics.LastUpdateTime < newMetrics.LastUpdateTime) + _metrics = newMetrics; + } + + return newMetrics; + } + + return oldMetrics; + } + + /** <inheritDoc /> */ + public override string ToString() + { + return "GridNode [id=" + Id + ']'; + } + + /** <inheritDoc /> */ + public override bool Equals(object obj) + { + ClusterNodeImpl node = obj as ClusterNodeImpl; + + if (node != null) + return _id.Equals(node._id); + + return false; + } + + /** <inheritDoc /> */ + public override int GetHashCode() + { + // ReSharper disable once NonReadonlyMemberInGetHashCode + return _id.GetHashCode(); + } + + /// <summary> + /// Initializes this instance with a grid. + /// </summary> + /// <param name="grid">The grid.</param> + internal void Init(Ignite grid) + { + _igniteRef = new WeakReference(grid); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs new file mode 100644 index 0000000..554eb0a --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cluster +{ + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// + /// </summary> + internal interface IClusterGroupEx : IClusterGroup + { + /// <summary> + /// Gets protable metadata for type. + /// </summary> + /// <param name="typeId">Type ID.</param> + /// <returns>Metadata.</returns> + IPortableMetadata Metadata(int typeId); + } +}
