This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 32afeaaf0 IGNITE-16930 .NET: Thin 3.0: Add Compute.ExecuteColocated (#809) 32afeaaf0 is described below commit 32afeaaf02d8eb4d0d41f8cefb49263f52976ede Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Tue May 17 16:44:22 2022 +0300 IGNITE-16930 .NET: Thin 3.0: Add Compute.ExecuteColocated (#809) * Implement `ExecuteColocated` in .NET client. Send requests to default node, partition awareness will be added later (IGNITE-16930). * To avoid extra table request on every `ExecuteColocated` call (we need table id and schemas), cache tables by name. If a table gets dropped and created again with the same name and a different id, retry the operation. --- .../Apache.Ignite.Tests/Compute/ComputeTests.cs | 79 +++++++++++++- .../dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs | 9 +- .../RawSocketConnectionTests.cs | 1 + .../Table/RecordViewBinaryTests.cs | 6 ++ .../Table/RecordViewPocoTests.cs | 6 ++ .../Apache.Ignite.Tests/Table/TablesTests.cs | 4 +- .../Transactions/TransactionsTests.cs | 6 ++ .../dotnet/Apache.Ignite/ClientErrorCode.cs | 7 +- .../dotnet/Apache.Ignite/Compute/ICompute.cs | 25 +++++ .../dotnet/Apache.Ignite/IgniteClientException.cs | 14 +-- .../Apache.Ignite/Internal/Compute/Compute.cs | 119 ++++++++++++++++++++- .../Apache.Ignite/Internal/IgniteClientInternal.cs | 8 +- .../Apache.Ignite/Internal/Proto/ClientOp.cs | 5 +- .../Internal/Proto/ClientOpExtensions.cs | 1 + .../Apache.Ignite/Internal/Table/RecordView.cs | 5 + .../Table/Serialization/RecordSerializer.cs | 29 ++++- .../dotnet/Apache.Ignite/Internal/Table/Table.cs | 42 +++----- .../dotnet/Apache.Ignite/Internal/Table/Tables.cs | 67 +++++++----- .../runner/app/PlatformTestNodeRunner.java | 38 +++++++ 19 files changed, 395 insertions(+), 76 deletions(-) diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs index 35b1d38f5..1c0384c4f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs @@ -23,25 +23,33 @@ namespace Apache.Ignite.Tests.Compute using System.Net; using System.Threading.Tasks; using Ignite.Compute; + using Ignite.Table; using Internal.Network; using Network; using NUnit.Framework; + using Table; /// <summary> /// Tests <see cref="ICompute"/>. /// </summary> public class ComputeTests : IgniteTestsBase { - private const string ConcatJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$ConcatJob"; + private const string ItThinClientComputeTest = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest"; - private const string NodeNameJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$NodeNameJob"; + private const string ConcatJob = ItThinClientComputeTest + "$ConcatJob"; - private const string ErrorJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$ErrorJob"; + private const string NodeNameJob = ItThinClientComputeTest + "$NodeNameJob"; - private const string EchoJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$EchoJob"; + private const string ErrorJob = ItThinClientComputeTest + "$ErrorJob"; + + private const string EchoJob = ItThinClientComputeTest + "$EchoJob"; private const string PlatformTestNodeRunner = "org.apache.ignite.internal.runner.app.PlatformTestNodeRunner"; + private const string CreateTableJob = PlatformTestNodeRunner + "$CreateTableJob"; + + private const string DropTableJob = PlatformTestNodeRunner + "$DropTableJob"; + [Test] public async Task TestGetClusterNodes() { @@ -177,6 +185,69 @@ namespace Apache.Ignite.Tests.Compute } } + [Test] + [TestCase(1, "")] + [TestCase(2, "_2")] + [TestCase(3, "")] + [TestCase(5, "_2")] + public async Task TestExecuteColocated(int key, string nodeName) + { + var keyTuple = new IgniteTuple { [KeyCol] = key }; + var resNodeName = await Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple, NodeNameJob); + + var keyPoco = new Poco { Key = key }; + var resNodeName2 = await Client.Compute.ExecuteColocatedAsync<string, Poco>(TableName, keyPoco, NodeNameJob); + + var expectedNodeName = PlatformTestNodeRunner + nodeName; + Assert.AreEqual(expectedNodeName, resNodeName); + Assert.AreEqual(expectedNodeName, resNodeName2); + } + + [Test] + public void TestExecuteColocatedThrowsWhenTableDoesNotExist() + { + var ex = Assert.ThrowsAsync<IgniteClientException>(async () => + await Client.Compute.ExecuteColocatedAsync<string>("unknownTable", new IgniteTuple(), EchoJob)); + + Assert.AreEqual("Table 'unknownTable' does not exist.", ex!.Message); + } + + [Test] + public void TestExecuteColocatedThrowsWhenKeyColumnIsMissing() + { + var ex = Assert.ThrowsAsync<IgniteClientException>(async () => + await Client.Compute.ExecuteColocatedAsync<string>(TableName, new IgniteTuple(), EchoJob)); + + StringAssert.Contains("Missed key column: KEY", ex!.Message); + } + + [Test] + public async Task TestExecuteColocatedUpdatesTableCacheOnTableDrop() + { + // Create table and use it in ExecuteColocated. + var nodes = await GetNodeAsync(0); + var tableName = await Client.Compute.ExecuteAsync<string>(nodes, CreateTableJob, "PUB.drop-me"); + + try + { + var keyTuple = new IgniteTuple { [KeyCol] = 1 }; + var resNodeName = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob); + + // Drop table and create a new one with a different ID, then execute a computation again. + // This should update the cached table and complete the computation successfully. + await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob, tableName); + await Client.Compute.ExecuteAsync<string>(nodes, CreateTableJob, tableName); + + var resNodeName2 = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob); + + Assert.AreEqual(resNodeName, resNodeName2); + } + finally + { + await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob, tableName); + } + } + private async Task<List<IClusterNode>> GetNodeAsync(int index) => (await Client.GetClusterNodesAsync()).OrderBy(n => n.Name).Skip(index).Take(1).ToList(); } diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs index 68486aa07..2ed3bad6e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs @@ -18,7 +18,6 @@ namespace Apache.Ignite.Tests { using System; - using System.Linq; using System.Threading.Tasks; using Ignite.Table; using Log; @@ -30,7 +29,7 @@ namespace Apache.Ignite.Tests /// </summary> public class IgniteTestsBase { - protected const string TableName = "PUB.tbl1"; + protected const string TableName = "PUB.TBL1"; protected const string KeyCol = "key"; @@ -74,7 +73,7 @@ namespace Apache.Ignite.Tests [OneTimeTearDown] public void OneTimeTearDown() { - // ReSharper disable once ConstantConditionalAccessQualifier + // ReSharper disable once ConstantConditionalAccessQualifier, ConditionalAccessQualifierIsNonNullableAccordingToAPIContract Client?.Dispose(); Assert.Greater(_eventListener.BuffersRented, 0); @@ -83,10 +82,8 @@ namespace Apache.Ignite.Tests } [TearDown] - public async Task TearDown() + public void TearDown() { - await TupleView.DeleteAllAsync(null, Enumerable.Range(-5, 20).Select(x => GetTuple(x))); - Assert.AreEqual(_eventListener.BuffersReturned, _eventListener.BuffersRented); } diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs index 86a68ca4e..df2702cc6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs @@ -15,6 +15,7 @@ * limitations under the License. */ +// ReSharper disable MustUseReturnValue namespace Apache.Ignite.Tests { using System; diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs index 19ce6b691..5a3ad6662 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs @@ -30,6 +30,12 @@ namespace Apache.Ignite.Tests.Table /// </summary> public class RecordViewBinaryTests : IgniteTestsBase { + [TearDown] + public async Task CleanTable() + { + await TupleView.DeleteAllAsync(null, Enumerable.Range(-1, 12).Select(x => GetTuple(x))); + } + [Test] public async Task TestUpsertGet() { diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs index 11de0746f..0f12adaeb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs @@ -29,6 +29,12 @@ namespace Apache.Ignite.Tests.Table /// </summary> public class RecordViewPocoTests : IgniteTestsBase { + [TearDown] + public async Task CleanTable() + { + await TupleView.DeleteAllAsync(null, Enumerable.Range(-1, 12).Select(x => GetTuple(x))); + } + [Test] public async Task TestUpsertGet() { diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs index 0edc3f73e..ab082298f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs @@ -33,7 +33,7 @@ namespace Apache.Ignite.Tests.Table var tables = await Client.Tables.GetTablesAsync(); Assert.AreEqual(1, tables.Count); - Assert.AreEqual("PUB.TBL1", tables[0].Name); + Assert.AreEqual(TableName, tables[0].Name); } [Test] @@ -42,7 +42,7 @@ namespace Apache.Ignite.Tests.Table var table = await Client.Tables.GetTableAsync(TableName); Assert.IsNotNull(table); - Assert.AreEqual("PUB.tbl1", table!.Name); + Assert.AreEqual(TableName, table!.Name); } [Test] diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs index 6b5038b16..7d373ced1 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs @@ -28,6 +28,12 @@ namespace Apache.Ignite.Tests.Transactions /// </summary> public class TransactionsTests : IgniteTestsBase { + [TearDown] + public async Task CleanTable() + { + await TupleView.DeleteAllAsync(null, Enumerable.Range(1, 2).Select(x => GetTuple(x))); + } + [Test] public async Task TestRecordViewBinaryOperations() { diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs b/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs index 1c748b7b1..2b34b529c 100644 --- a/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs +++ b/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs @@ -35,6 +35,11 @@ namespace Apache.Ignite /// <summary> /// Authentication or authorization failure. /// </summary> - AuthFailed = 2 + AuthFailed = 2, + + /// <summary> + /// Table id does not exist. + /// </summary> + TableIdDoesNotExist = 3 } } diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs index 09b0edd15..8c893bd6d 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs @@ -20,6 +20,7 @@ namespace Apache.Ignite.Compute using System.Collections.Generic; using System.Threading.Tasks; using Network; + using Table; /// <summary> /// Ignite Compute API provides distributed job execution functionality. @@ -36,6 +37,30 @@ namespace Apache.Ignite.Compute /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> Task<T> ExecuteAsync<T>(IEnumerable<IClusterNode> nodes, string jobClassName, params object[] args); + /// <summary> + /// Executes a job represented by the given class on one node where the given key is located. + /// </summary> + /// <param name="tableName">Name of the table to be used with <paramref name="key"/> to determine target node.</param> + /// <param name="key">Table key to be used to determine the target node for job execution.</param> + /// <param name="jobClassName">Java class name of the job to execute.</param> + /// <param name="args">Job arguments.</param> + /// <typeparam name="T">Job result type.</typeparam> + /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> + Task<T> ExecuteColocatedAsync<T>(string tableName, IIgniteTuple key, string jobClassName, params object[] args); + + /// <summary> + /// Executes a job represented by the given class on one node where the given key is located. + /// </summary> + /// <param name="tableName">Name of the table to be used with <paramref name="key"/> to determine target node.</param> + /// <param name="key">Table key to be used to determine the target node for job execution.</param> + /// <param name="jobClassName">Java class name of the job to execute.</param> + /// <param name="args">Job arguments.</param> + /// <typeparam name="T">Job result type.</typeparam> + /// <typeparam name="TKey">Key type.</typeparam> + /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> + Task<T> ExecuteColocatedAsync<T, TKey>(string tableName, TKey key, string jobClassName, params object[] args) + where TKey : class; // TODO: Remove class constraint (IGNITE-16355) + /// <summary> /// Executes a compute job represented by the given class on all of the specified nodes. /// </summary> diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs b/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs index 026262f38..29d44989d 100644 --- a/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs +++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs @@ -29,9 +29,6 @@ namespace Apache.Ignite /** Error code field. */ private const string ErrorCodeField = "StatusCode"; - /** Error code. */ - private readonly ClientErrorCode _errorCode; - /// <summary> /// Initializes a new instance of the <see cref="IgniteClientException"/> class. /// </summary> @@ -70,7 +67,7 @@ namespace Apache.Ignite public IgniteClientException(string message, Exception? cause, ClientErrorCode statusCode) : base(message, cause) { - _errorCode = statusCode; + ErrorCode = statusCode; } /// <summary> @@ -81,9 +78,14 @@ namespace Apache.Ignite protected IgniteClientException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) { - _errorCode = (ClientErrorCode) info.GetInt32(ErrorCodeField); + ErrorCode = (ClientErrorCode)info.GetInt32(ErrorCodeField); } + /// <summary> + /// Gets the error code. + /// </summary> + public ClientErrorCode ErrorCode { get; } + /// <summary> /// When overridden in a derived class, sets the <see cref="SerializationInfo" /> /// with information about the exception. @@ -96,7 +98,7 @@ namespace Apache.Ignite { base.GetObjectData(info, context); - info.AddValue(ErrorCodeField, (int) _errorCode); + info.AddValue(ErrorCodeField, (int) ErrorCode); } } } diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs index 248818b34..0cf47f6ff 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs @@ -17,6 +17,8 @@ namespace Apache.Ignite.Internal.Compute { + using System; + using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -24,7 +26,10 @@ namespace Apache.Ignite.Internal.Compute using Common; using Ignite.Compute; using Ignite.Network; + using Ignite.Table; using Proto; + using Table; + using Table.Serialization; /// <summary> /// Compute API. @@ -34,13 +39,21 @@ namespace Apache.Ignite.Internal.Compute /** Socket. */ private readonly ClientFailoverSocket _socket; + /** Tables. */ + private readonly Tables _tables; + + /** Cached tables. */ + private readonly ConcurrentDictionary<string, Table> _tableCache = new(); + /// <summary> /// Initializes a new instance of the <see cref="Compute"/> class. /// </summary> /// <param name="socket">Socket.</param> - public Compute(ClientFailoverSocket socket) + /// <param name="tables">Tables.</param> + public Compute(ClientFailoverSocket socket, Tables tables) { _socket = socket; + _tables = tables; } /// <inheritdoc/> @@ -52,6 +65,27 @@ namespace Apache.Ignite.Internal.Compute return await ExecuteOnOneNode<T>(GetRandomNode(nodes), jobClassName, args).ConfigureAwait(false); } + /// <inheritdoc/> + public async Task<T> ExecuteColocatedAsync<T>(string tableName, IIgniteTuple key, string jobClassName, params object[] args) => + await ExecuteColocatedAsync<T, IIgniteTuple>( + tableName, + key, + serializerHandlerFunc: _ => TupleSerializerHandler.Instance, + jobClassName, + args) + .ConfigureAwait(false); + + /// <inheritdoc/> + public async Task<T> ExecuteColocatedAsync<T, TKey>(string tableName, TKey key, string jobClassName, params object[] args) + where TKey : class => + await ExecuteColocatedAsync<T, TKey>( + tableName, + key, + serializerHandlerFunc: table => table.GetRecordViewInternal<TKey>().RecordSerializer.Handler, + jobClassName, + args) + .ConfigureAwait(false); + /// <inheritdoc/> public IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>(IEnumerable<IClusterNode> nodes, string jobClassName, params object[] args) { @@ -82,7 +116,7 @@ namespace Apache.Ignite.Internal.Compute } private static ICollection<IClusterNode> GetNodesCollection(IEnumerable<IClusterNode> nodes) => - nodes is ICollection<IClusterNode> col ? col : nodes.ToList(); + nodes as ICollection<IClusterNode> ?? nodes.ToList(); private async Task<T> ExecuteOnOneNode<T>(IClusterNode node, string jobClassName, object[] args) { @@ -138,5 +172,86 @@ namespace Apache.Ignite.Internal.Compute return (T)reader.ReadObjectWithType()!; } } + + private async Task<Table> GetTableAsync(string tableName) + { + if (_tableCache.TryGetValue(tableName, out var cachedTable)) + { + return cachedTable; + } + + var table = await _tables.GetTableInternalAsync(tableName).ConfigureAwait(false); + + if (table != null) + { + _tableCache[tableName] = table; + return table; + } + + _tableCache.TryRemove(tableName, out _); + + throw new IgniteClientException($"Table '{tableName}' does not exist."); + } + + private async Task<T> ExecuteColocatedAsync<T, TKey>( + string tableName, + TKey key, + Func<Table, IRecordSerializerHandler<TKey>> serializerHandlerFunc, + string jobClassName, + params object[] args) + where TKey : class + { + // TODO: IGNITE-16990 - implement partition awareness. + IgniteArgumentCheck.NotNull(tableName, nameof(tableName)); + IgniteArgumentCheck.NotNull(key, nameof(key)); + IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName)); + + while (true) + { + var table = await GetTableAsync(tableName).ConfigureAwait(false); + var schema = await table.GetLatestSchemaAsync().ConfigureAwait(false); + + using var bufferWriter = Write(table, schema); + + try + { + using var res = await _socket.DoOutInOpAsync(ClientOp.ComputeExecuteColocated, bufferWriter).ConfigureAwait(false); + + return Read(res); + } + catch (IgniteClientException e) when (e.ErrorCode == ClientErrorCode.TableIdDoesNotExist) + { + // Table was dropped - remove from cache. + // Try again in case a new table with the same name exists. + _tableCache.TryRemove(tableName, out _); + } + } + + PooledArrayBufferWriter Write(Table table, Schema schema) + { + var bufferWriter = new PooledArrayBufferWriter(); + var w = bufferWriter.GetMessageWriter(); + + w.Write(table.Id); + w.Write(schema.Version); + + var serializerHandler = serializerHandlerFunc(table); + serializerHandler.Write(ref w, schema, key, true); + + w.Write(jobClassName); + w.WriteObjectArrayWithTypes(args); + + w.Flush(); + + return bufferWriter; + } + + static T Read(in PooledBuffer buf) + { + var reader = buf.GetReader(); + + return (T)reader.ReadObjectWithType()!; + } + } } } diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs index 20dededa1..401ebe8d3 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs @@ -44,9 +44,13 @@ namespace Apache.Ignite.Internal public IgniteClientInternal(ClientFailoverSocket socket) { _socket = socket; - Tables = new Tables(socket); + + var tables = new Tables(socket); + Tables = tables; + Transactions = new Transactions.Transactions(socket); - Compute = new Compute.Compute(socket); + + Compute = new Compute.Compute(socket, tables); } /// <inheritdoc/> diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs index b00eaa17b..72e89b0e9 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs @@ -92,6 +92,9 @@ namespace Apache.Ignite.Internal.Proto ComputeExecute = 47, /** Get cluster nodes. */ - ClusterGetNodes = 48 + ClusterGetNodes = 48, + + /** Execute compute job. */ + ComputeExecuteColocated = 49 } } diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs index 714a3fa06..c1d65d717 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs @@ -52,6 +52,7 @@ namespace Apache.Ignite.Internal.Proto ClientOp.TupleDeleteAllExact => ClientOperationType.TupleDeleteAllExact, ClientOp.TupleGetAndDelete => ClientOperationType.TupleGetAndDelete, ClientOp.ComputeExecute => ClientOperationType.ComputeExecute, + ClientOp.ComputeExecuteColocated => ClientOperationType.ComputeExecute, ClientOp.TxBegin => null, ClientOp.TxCommit => null, ClientOp.TxRollback => null, diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs index 5f1edea60..71e568083 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs @@ -52,6 +52,11 @@ namespace Apache.Ignite.Internal.Table _ser = ser; } + /// <summary> + /// Gets the record serializer. + /// </summary> + public RecordSerializer<T> RecordSerializer => _ser; + /// <inheritdoc/> public async Task<T?> GetAsync(ITransaction? transaction, T key) { diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs index f30a57d5b..ee0970ee2 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs @@ -21,6 +21,7 @@ namespace Apache.Ignite.Internal.Table.Serialization using System.Collections.Generic; using Buffers; using MessagePack; + using Proto; /// <summary> /// Generic record serializer. @@ -47,6 +48,11 @@ namespace Apache.Ignite.Internal.Table.Serialization _handler = handler; } + /// <summary> + /// Gets the handler. + /// </summary> + public IRecordSerializerHandler<T> Handler => _handler; + /// <summary> /// Reads the value part. /// </summary> @@ -195,7 +201,7 @@ namespace Apache.Ignite.Internal.Table.Serialization { var w = buf.GetMessageWriter(); - _table.WriteIdAndTx(ref w, tx); + WriteIdAndTx(ref w, tx); w.Write(schema.Version); w.Flush(); @@ -236,10 +242,29 @@ namespace Apache.Ignite.Internal.Table.Serialization T rec, bool keyOnly = false) { - _table.WriteIdAndTx(ref w, tx); + WriteIdAndTx(ref w, tx); w.Write(schema.Version); _handler.Write(ref w, schema, rec, keyOnly); } + + /// <summary> + /// Writes table id and transaction id, if present. + /// </summary> + /// <param name="w">Writer.</param> + /// <param name="tx">Transaction.</param> + private void WriteIdAndTx(ref MessagePackWriter w, Transactions.Transaction? tx) + { + w.Write(_table.Id); + + if (tx == null) + { + w.WriteNil(); + } + else + { + w.Write(tx.Id); + } + } } } diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs index 943c2d426..ff8392350 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs @@ -35,9 +35,6 @@ namespace Apache.Ignite.Internal.Table /** Socket. */ private readonly ClientFailoverSocket _socket; - /** Table id. */ - private readonly Guid _id; - /** Schemas. */ private readonly ConcurrentDictionary<int, Schema> _schemas = new(); @@ -60,7 +57,7 @@ namespace Apache.Ignite.Internal.Table { _socket = socket; Name = name; - _id = id; + Id = id; RecordBinaryView = new RecordView<IIgniteTuple>( this, @@ -78,35 +75,30 @@ namespace Apache.Ignite.Internal.Table /// </summary> internal ClientFailoverSocket Socket => _socket; + /// <summary> + /// Gets the table id. + /// </summary> + internal Guid Id { get; } + /// <inheritdoc/> public IRecordView<T> GetRecordView<T>() + where T : class => + GetRecordViewInternal<T>(); + + /// <summary> + /// Gets the record view for the specified type. + /// </summary> + /// <typeparam name="T">Record type.</typeparam> + /// <returns>Record view.</returns> + internal RecordView<T> GetRecordViewInternal<T>() where T : class { // ReSharper disable once HeapView.CanAvoidClosure (generics prevent this) - return (IRecordView<T>)_recordViews.GetOrAdd( + return (RecordView<T>)_recordViews.GetOrAdd( typeof(T), _ => new RecordView<T>(this, new RecordSerializer<T>(this, new ObjectSerializerHandler<T>()))); } - /// <summary> - /// Writes the transaction id, if present. - /// </summary> - /// <param name="w">Writer.</param> - /// <param name="tx">Transaction.</param> - internal void WriteIdAndTx(ref MessagePackWriter w, Transactions.Transaction? tx) - { - w.Write(_id); - - if (tx == null) - { - w.WriteNil(); - } - else - { - w.Write(tx.Id); - } - } - /// <summary> /// Reads the schema. /// </summary> @@ -168,7 +160,7 @@ namespace Apache.Ignite.Internal.Table void Write() { var w = writer.GetMessageWriter(); - w.Write(_id); + w.Write(Id); if (version == null) { diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs index b8392099b..ef8ddd8f0 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs @@ -35,8 +35,8 @@ namespace Apache.Ignite.Internal.Table /** Socket. */ private readonly ClientFailoverSocket _socket; - /** Cached tables. */ - private readonly ConcurrentDictionary<Guid, ITable> _tables = new(); + /** Cached tables. Caching here is required to retain schema and serializer caches in <see cref="Table"/>. */ + private readonly ConcurrentDictionary<Guid, Table> _tables = new(); /// <summary> /// Initializes a new instance of the <see cref="Tables"/> class. @@ -50,28 +50,7 @@ namespace Apache.Ignite.Internal.Table /// <inheritdoc/> public async Task<ITable?> GetTableAsync(string name) { - IgniteArgumentCheck.NotNull(name, nameof(name)); - - using var writer = new PooledArrayBufferWriter(); - Write(writer.GetMessageWriter()); - - using var resBuf = await _socket.DoOutInOpAsync(ClientOp.TableGet, writer).ConfigureAwait(false); - return Read(resBuf.GetReader()); - - void Write(MessagePackWriter w) - { - w.Write(name); - w.Flush(); - } - - // ReSharper disable once LambdaExpressionMustBeStatic (requires .NET 5+) - ITable? Read(MessagePackReader r) => - r.NextMessagePackType == MessagePackType.Nil - ? null - : _tables.GetOrAdd( - r.ReadGuid(), - (Guid id, (string Name, ClientFailoverSocket Socket) arg) => new Table(arg.Name, id, arg.Socket), - (name, _socket)); + return await GetTableInternalAsync(name).ConfigureAwait(false); } /// <inheritdoc/> @@ -90,11 +69,49 @@ namespace Apache.Ignite.Internal.Table { var id = r.ReadGuid(); var name = r.ReadString(); - res.Add(new Table(name, id, _socket)); + + // ReSharper disable once LambdaExpressionMustBeStatic (not supported by .NET Core 3.1, TODO IGNITE-16994) + var table = _tables.GetOrAdd( + id, + (Guid id0, (string Name, ClientFailoverSocket Socket) arg) => new Table(arg.Name, id0, arg.Socket), + (name, _socket)); + + res.Add(table); } return res; } } + + /// <summary> + /// Gets the table by name. + /// </summary> + /// <param name="name">Name.</param> + /// <returns>Table.</returns> + internal async Task<Table?> GetTableInternalAsync(string name) + { + IgniteArgumentCheck.NotNull(name, nameof(name)); + + using var writer = new PooledArrayBufferWriter(); + Write(writer.GetMessageWriter()); + + using var resBuf = await _socket.DoOutInOpAsync(ClientOp.TableGet, writer).ConfigureAwait(false); + return Read(resBuf.GetReader()); + + void Write(MessagePackWriter w) + { + w.Write(name); + w.Flush(); + } + + // ReSharper disable once LambdaExpressionMustBeStatic (requires .NET 5+) + Table? Read(MessagePackReader r) => + r.NextMessagePackType == MessagePackType.Nil + ? null + : _tables.GetOrAdd( + r.ReadGuid(), + (Guid id, (string Name, ClientFailoverSocket Socket) arg) => new Table(arg.Name, id, arg.Socket), + (name, _socket)); + } } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java index a0acb8e5c..4e14c9e8e 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java @@ -27,12 +27,15 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.ignite.Ignite; import org.apache.ignite.IgnitionManager; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.JobExecutionContext; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.schema.SchemaBuilders; import org.apache.ignite.schema.definition.ColumnType; import org.apache.ignite.schema.definition.TableDefinition; +import org.apache.ignite.table.Table; /** * Helper class for non-Java platform tests (.NET, C++, Python, ...). Starts nodes, populates tables and data for tests. @@ -150,4 +153,39 @@ public class PlatformTestNodeRunner { private static int getPort(IgniteImpl node) { return node.clientAddress().port(); } + + /** + * Compute job that creates a table. + */ + @SuppressWarnings({"unused"}) // Used by platform tests. + private static class CreateTableJob implements ComputeJob<String> { + @Override + public String execute(JobExecutionContext context, Object... args) { + String tableName = (String) args[0]; + + Table table = context.ignite().tables().createTable( + tableName, + tblChanger -> tblChanger + .changeColumns(cols -> + cols.create("key", col -> col.changeType(t -> t.changeType("INT64")).changeNullable(false))) + .changePrimaryKey(pk -> pk.changeColumns("key").changeColocationColumns("key")) + ); + + return table.name(); + } + } + + /** + * Compute job that drops a table. + */ + @SuppressWarnings({"unused"}) // Used by platform tests. + private static class DropTableJob implements ComputeJob<String> { + @Override + public String execute(JobExecutionContext context, Object... args) { + String tableName = (String) args[0]; + context.ignite().tables().dropTable(tableName); + + return tableName; + } + } }