IGNITE-6279 .NET: Decouple AbstractQueryCursor from PlatformTarget This closes #2598
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e66b98b4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e66b98b4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e66b98b4 Branch: refs/heads/ignite-3484 Commit: e66b98b419a90ecbb255d07e17f184c1834837bd Parents: 9d10d20 Author: Pavel Tupitsyn <[email protected]> Authored: Wed Sep 6 16:24:41 2017 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Wed Sep 6 16:24:41 2017 +0300 ---------------------------------------------------------------------- .../Apache.Ignite.Core.csproj | 3 +- .../Impl/Cache/Query/AbstractQueryCursor.cs | 264 ----------------- .../Impl/Cache/Query/FieldsQueryCursor.cs | 2 +- .../Impl/Cache/Query/PlatformQueryQursorBase.cs | 84 ++++++ .../Impl/Cache/Query/QueryCursor.cs | 2 +- .../Impl/Cache/Query/QueryCursorBase.cs | 288 +++++++++++++++++++ 6 files changed, 376 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e66b98b4/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index dd40156..0a100f7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -100,6 +100,7 @@ <Compile Include="Common\ExceptionFactory.cs" /> <Compile Include="Configuration\Package-Info.cs" /> <Compile Include="Impl\Binary\BinaryTypeId.cs" /> + <Compile Include="Impl\Cache\Query\PlatformQueryQursorBase.cs" /> <Compile Include="Impl\IIgniteInternal.cs" /> <Compile Include="Impl\IPlatformTargetInternal.cs" /> <Compile Include="Impl\PersistentStore\PersistentStoreMetrics.cs" /> @@ -321,7 +322,7 @@ <Compile Include="Impl\Cache\Event\CacheEntryUpdateEvent.cs" /> <Compile Include="Impl\Cache\ICacheInternal.cs" /> <Compile Include="Impl\Cache\MutableCacheEntry.cs" /> - <Compile Include="Impl\Cache\Query\AbstractQueryCursor.cs" /> + <Compile Include="Impl\Cache\Query\QueryCursorBase.cs" /> <Compile Include="Impl\Cache\Query\Continuous\ContinuousQueryFilter.cs" /> <Compile Include="Impl\Cache\Query\Continuous\ContinuousQueryFilterHolder.cs" /> <Compile Include="Impl\Cache\Query\Continuous\ContinuousQueryHandleImpl.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/e66b98b4/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs deleted file mode 100644 index 8e4985e..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache.Query -{ - using System; - using System.Collections; - using System.Collections.Generic; - using Apache.Ignite.Core.Cache.Query; - using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Binary.IO; - - /// <summary> - /// Abstract query cursor implementation. - /// </summary> - internal abstract class AbstractQueryCursor<T> : PlatformDisposableTargetAdapter, IQueryCursor<T>, IEnumerator<T> - { - /** */ - private const int OpGetAll = 1; - - /** */ - private const int OpGetBatch = 2; - - /** */ - private const int OpIterator = 4; - - /** */ - private const int OpIteratorClose = 5; - - /** Position before head. */ - private const int BatchPosBeforeHead = -1; - - /** Keep binary flag. */ - private readonly bool _keepBinary; - - /** Wherther "GetAll" was called. */ - private bool _getAllCalled; - - /** Whether "GetEnumerator" was called. */ - private bool _iterCalled; - - /** Batch with entries. */ - private T[] _batch; - - /** Current position in batch. */ - private int _batchPos = BatchPosBeforeHead; - - /// <summary> - /// Constructor. - /// </summary> - /// <param name="target">Target.</param> - /// <param name="keepBinary">Keep binary flag.</param> - protected AbstractQueryCursor(IPlatformTargetInternal target, bool keepBinary) : base(target) - { - _keepBinary = keepBinary; - } - - #region Public methods - - /** <inheritdoc /> */ - public IList<T> GetAll() - { - ThrowIfDisposed(); - - if (_iterCalled) - throw new InvalidOperationException("Failed to get all entries because GetEnumerator() " + - "method has already been called."); - - if (_getAllCalled) - throw new InvalidOperationException("Failed to get all entries because GetAll() " + - "method has already been called."); - - var res = DoInOp(OpGetAll, ConvertGetAll); - - _getAllCalled = true; - - return res; - } - - /** <inheritdoc /> */ - protected override void Dispose(bool disposing) - { - try - { - DoOutInOp(OpIteratorClose); - } - finally - { - base.Dispose(disposing); - } - } - - #endregion - - #region Public IEnumerable methods - - /** <inheritdoc /> */ - public IEnumerator<T> GetEnumerator() - { - ThrowIfDisposed(); - - if (_iterCalled) - throw new InvalidOperationException("Failed to get enumerator entries because " + - "GetEnumerator() method has already been called."); - - if (_getAllCalled) - throw new InvalidOperationException("Failed to get enumerator entries because " + - "GetAll() method has already been called."); - - DoOutInOp(OpIterator); - - _iterCalled = true; - - return this; - } - - /** <inheritdoc /> */ - IEnumerator IEnumerable.GetEnumerator() - { - return GetEnumerator(); - } - - #endregion - - #region Public IEnumerator methods - - /** <inheritdoc /> */ - public T Current - { - get - { - ThrowIfDisposed(); - - if (_batchPos == BatchPosBeforeHead) - throw new InvalidOperationException("MoveNext has not been called."); - - if (_batch == null) - throw new InvalidOperationException("Previous call to MoveNext returned false."); - - return _batch[_batchPos]; - } - } - - /** <inheritdoc /> */ - object IEnumerator.Current - { - get { return Current; } - } - - /** <inheritdoc /> */ - public bool MoveNext() - { - ThrowIfDisposed(); - - if (_batch == null) - { - if (_batchPos == BatchPosBeforeHead) - // Standing before head, let's get batch and advance position. - RequestBatch(); - } - else - { - _batchPos++; - - if (_batch.Length == _batchPos) - // Reached batch end => request another. - RequestBatch(); - } - - return _batch != null; - } - - /** <inheritdoc /> */ - public void Reset() - { - throw new NotSupportedException("Reset is not supported."); - } - - #endregion - - #region Non-public methods - - /// <summary> - /// Read entry from the reader. - /// </summary> - /// <param name="reader">Reader.</param> - /// <returns>Entry.</returns> - protected abstract T Read(BinaryReader reader); - - /** <inheritdoc /> */ - protected override T1 Unmarshal<T1>(IBinaryStream stream) - { - return Marshaller.Unmarshal<T1>(stream, _keepBinary); - } - - /// <summary> - /// Request next batch. - /// </summary> - private void RequestBatch() - { - _batch = DoInOp(OpGetBatch, ConvertGetBatch); - - _batchPos = 0; - } - - /// <summary> - /// Converter for GET_ALL operation. - /// </summary> - /// <param name="stream">Stream.</param> - /// <returns>Result.</returns> - private IList<T> ConvertGetAll(IBinaryStream stream) - { - var reader = Marshaller.StartUnmarshal(stream, _keepBinary); - - var size = reader.ReadInt(); - - var res = new List<T>(size); - - for (var i = 0; i < size; i++) - res.Add(Read(reader)); - - return res; - } - - /// <summary> - /// Converter for GET_BATCH operation. - /// </summary> - /// <param name="stream">Stream.</param> - /// <returns>Result.</returns> - private T[] ConvertGetBatch(IBinaryStream stream) - { - var reader = Marshaller.StartUnmarshal(stream, _keepBinary); - - var size = reader.ReadInt(); - - if (size == 0) - return null; - - var res = new T[size]; - - for (var i = 0; i < size; i++) - res[i] = Read(reader); - - return res; - } - - #endregion - - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e66b98b4/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs index 9d021dc..17dc93b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs @@ -26,7 +26,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query /// <summary> /// Cursor for entry-based queries. /// </summary> - internal class FieldsQueryCursor<T> : AbstractQueryCursor<T> + internal class FieldsQueryCursor<T> : PlatformQueryQursorBase<T> { /** */ private readonly Func<IBinaryRawReader, int, T> _readerFunc; http://git-wip-us.apache.org/repos/asf/ignite/blob/e66b98b4/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/PlatformQueryQursorBase.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/PlatformQueryQursorBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/PlatformQueryQursorBase.cs new file mode 100644 index 0000000..8a51dab --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/PlatformQueryQursorBase.cs @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Query +{ + using System.Collections.Generic; + + /// <summary> + /// Base for platform cursors. + /// </summary> + internal abstract class PlatformQueryQursorBase<T> : QueryCursorBase<T> + { + /** */ + private readonly IPlatformTargetInternal _target; + + /** */ + private const int OpGetAll = 1; + + /** */ + private const int OpGetBatch = 2; + + /** */ + private const int OpIterator = 4; + + /** */ + private const int OpIteratorClose = 5; + + /// <summary> + /// Initializes a new instance of the <see cref="PlatformQueryQursorBase{T}"/> class. + /// </summary> + /// <param name="target">The target.</param> + /// <param name="keepBinary">Keep binary flag.</param> + protected PlatformQueryQursorBase(IPlatformTargetInternal target, bool keepBinary) + : base(target.Marshaller, keepBinary) + { + _target = target; + } + + /** <inheritdoc /> */ + protected override IList<T> GetAllInternal() + { + return _target.OutStream(OpGetAll, ConvertGetAll); + } + + /** <inheritdoc /> */ + protected override void InitIterator() + { + _target.InLongOutLong(OpIterator, 0); + } + + /** <inheritdoc /> */ + protected override T[] GetBatch() + { + return _target.OutStream(OpGetBatch, ConvertGetBatch); + } + + /** <inheritdoc /> */ + protected override void Dispose(bool disposing) + { + try + { + _target.InLongOutLong(OpIteratorClose, 0); + } + finally + { + base.Dispose(disposing); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e66b98b4/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs index bc3cdb6..b967d6a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs @@ -24,7 +24,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query /// <summary> /// Cursor for entry-based queries. /// </summary> - internal class QueryCursor<TK, TV> : AbstractQueryCursor<ICacheEntry<TK, TV>> + internal class QueryCursor<TK, TV> : PlatformQueryQursorBase<ICacheEntry<TK, TV>> { /// <summary> /// Constructor. http://git-wip-us.apache.org/repos/asf/ignite/blob/e66b98b4/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs new file mode 100644 index 0000000..418bb24 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Query +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Diagnostics; + using Apache.Ignite.Core.Cache.Query; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + + /// <summary> + /// Abstract query cursor implementation. + /// </summary> + internal abstract class QueryCursorBase<T> : IQueryCursor<T>, IEnumerator<T> + { + /** Position before head. */ + private const int BatchPosBeforeHead = -1; + + /** Keep binary flag. */ + private readonly bool _keepBinary; + + /** Marshaller. */ + private readonly Marshaller _marsh; + + /** Wherther "GetAll" was called. */ + private bool _getAllCalled; + + /** Whether "GetEnumerator" was called. */ + private bool _iterCalled; + + /** Batch with entries. */ + private T[] _batch; + + /** Current position in batch. */ + private int _batchPos = BatchPosBeforeHead; + + /** Disposed flag. */ + private volatile bool _disposed; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="marsh">Marshaller.</param> + /// <param name="keepBinary">Keep binary flag.</param> + protected QueryCursorBase(Marshaller marsh, bool keepBinary) + { + Debug.Assert(marsh != null); + + _keepBinary = keepBinary; + _marsh = marsh; + } + + /** <inheritdoc /> */ + public IList<T> GetAll() + { + ThrowIfDisposed(); + + if (_iterCalled) + throw new InvalidOperationException("Failed to get all entries because GetEnumerator() " + + "method has already been called."); + + if (_getAllCalled) + throw new InvalidOperationException("Failed to get all entries because GetAll() " + + "method has already been called."); + + var res = GetAllInternal(); + + _getAllCalled = true; + + return res; + } + + #region Public IEnumerable methods + + /** <inheritdoc /> */ + public IEnumerator<T> GetEnumerator() + { + ThrowIfDisposed(); + + if (_iterCalled) + { + throw new InvalidOperationException("Failed to get enumerator entries because " + + "GetEnumerator() method has already been called."); + } + + if (_getAllCalled) + { + throw new InvalidOperationException("Failed to get enumerator entries because " + + "GetAll() method has already been called."); + } + + InitIterator(); + + _iterCalled = true; + + return this; + } + + protected abstract void InitIterator(); + + /** <inheritdoc /> */ + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + #endregion + + #region Public IEnumerator methods + + /** <inheritdoc /> */ + public T Current + { + get + { + ThrowIfDisposed(); + + if (_batchPos == BatchPosBeforeHead) + throw new InvalidOperationException("MoveNext has not been called."); + + if (_batch == null) + throw new InvalidOperationException("Previous call to MoveNext returned false."); + + return _batch[_batchPos]; + } + } + + /** <inheritdoc /> */ + object IEnumerator.Current + { + get { return Current; } + } + + /** <inheritdoc /> */ + public bool MoveNext() + { + ThrowIfDisposed(); + + if (_batch == null) + { + if (_batchPos == BatchPosBeforeHead) + // Standing before head, let's get batch and advance position. + RequestBatch(); + } + else + { + _batchPos++; + + if (_batch.Length == _batchPos) + // Reached batch end => request another. + RequestBatch(); + } + + return _batch != null; + } + + /** <inheritdoc /> */ + public void Reset() + { + throw new NotSupportedException("Reset is not supported."); + } + + #endregion + + /// <summary> + /// Gets all entries. + /// </summary> + protected abstract IList<T> GetAllInternal(); + + /// <summary> + /// Reads entry from the reader. + /// </summary> + /// <param name="reader">Reader.</param> + /// <returns>Entry.</returns> + protected abstract T Read(BinaryReader reader); + + /// <summary> + /// Requests next batch. + /// </summary> + private void RequestBatch() + { + _batch = GetBatch(); + + _batchPos = 0; + } + + /// <summary> + /// Gets the next batch. + /// </summary> + protected abstract T[] GetBatch(); + + /// <summary> + /// Converter for GET_ALL operation. + /// </summary> + /// <param name="stream">Stream.</param> + /// <returns>Result.</returns> + protected IList<T> ConvertGetAll(IBinaryStream stream) + { + var reader = _marsh.StartUnmarshal(stream, _keepBinary); + + var size = reader.ReadInt(); + + var res = new List<T>(size); + + for (var i = 0; i < size; i++) + res.Add(Read(reader)); + + return res; + } + + /// <summary> + /// Converter for GET_BATCH operation. + /// </summary> + /// <param name="stream">Stream.</param> + /// <returns>Result.</returns> + protected T[] ConvertGetBatch(IBinaryStream stream) + { + var reader = _marsh.StartUnmarshal(stream, _keepBinary); + + var size = reader.ReadInt(); + + if (size == 0) + return null; + + var res = new T[size]; + + for (var i = 0; i < size; i++) + res[i] = Read(reader); + + return res; + } + + /** <inheritdoc /> */ + public void Dispose() + { + lock (this) + { + if (_disposed) + return; + + Dispose(true); + + GC.SuppressFinalize(this); + + _disposed = true; + } + } + + /// <summary> + /// Releases unmanaged and - optionally - managed resources. + /// </summary> + /// <param name="disposing"> + /// <c>true</c> when called from Dispose; <c>false</c> when called from finalizer. + /// </param> + protected virtual void Dispose(bool disposing) + { + // No-op. + } + + /// <summary> + /// Throws <see cref="ObjectDisposedException"/> if this instance has been disposed. + /// </summary> + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(GetType().Name, "Object has been disposed."); + } + } + } +}
