This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new be15cc7c95 IGNITE-22150 .NET: Add PartitionManager API (#4062)
be15cc7c95 is described below

commit be15cc7c9504b1c735b998ea8ff376c22a505c91
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Jul 10 12:47:41 2024 +0300

    IGNITE-22150 .NET: Add PartitionManager API (#4062)
    
    * Add `ITable.PartitionManager` API (`GetPrimaryReplicas`, `GetPartition`)
    * Cache results based on known `PartitionAssignmentTimestamp`
---
 .../client/table/ClientPartitionManager.java       |   6 -
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    |   2 +
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       |  23 +++
 .../dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs  |   1 +
 .../Table/PartitionManagerTests.cs                 | 180 +++++++++++++++++
 .../dotnet/Apache.Ignite/ClientOperationType.cs    |   5 +
 .../Apache.Ignite/Internal/IgniteClientInternal.cs |   8 +-
 .../Apache.Ignite/Internal/Network/ClusterNode.cs  |  28 +++
 .../Apache.Ignite/Internal/Proto/ClientOp.cs       |   3 +
 .../Internal/Proto/ClientOpExtensions.cs           |   1 +
 .../Apache.Ignite/Internal/Table/HashPartition.cs  |  31 +++
 .../Internal/Table/PartitionManager.cs             | 224 +++++++++++++++++++++
 .../Serialization/IRecordSerializerHandler.cs      |  24 +++
 .../dotnet/Apache.Ignite/Internal/Table/Table.cs   |   5 +
 .../dotnet/Apache.Ignite/RetryReadPolicy.cs        |   1 +
 .../dotnet/Apache.Ignite/Table/IPartition.cs       |  29 +++
 .../Apache.Ignite/Table/IPartitionManager.cs       |  61 ++++++
 .../platforms/dotnet/Apache.Ignite/Table/ITable.cs |   5 +
 .../runner/app/PlatformTestNodeRunner.java         |  14 ++
 19 files changed, 638 insertions(+), 13 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
index ad0494985d..1e07c62174 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
@@ -23,7 +23,6 @@ import static 
org.apache.ignite.internal.client.table.ClientTupleSerializer.getP
 
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -85,11 +84,6 @@ public class ClientPartitionManager implements 
PartitionManager {
                 w -> w.out().packInt(tbl.tableId()),
                 r -> {
                     ClientMessageUnpacker in = r.in();
-
-                    if (in.tryUnpackNil()) {
-                        return Collections.<Partition, ClusterNode>emptyMap();
-                    }
-
                     int size = in.unpackInt();
 
                     Map<Partition, ClusterNode> res = new HashMap<>(size);
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 03e8213d2c..ee4d5e4514 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -66,6 +66,8 @@ namespace Apache.Ignite.Tests.Compute
 
         public static readonly JobDescriptor<string, object> 
CheckedExceptionJob = new(PlatformTestNodeRunner + "$CheckedExceptionJob");
 
+        public static readonly JobDescriptor<long, int> PartitionJob = 
new(PlatformTestNodeRunner + "$PartitionJob");
+
         [Test]
         public async Task TestGetClusterNodes()
         {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 669b73167d..fd6d078db7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -377,6 +377,29 @@ namespace Apache.Ignite.Tests
                         Send(handler, requestId, Array.Empty<byte>());
                         continue;
                     }
+
+                    case ClientOp.PrimaryReplicasGet:
+                    {
+                        using var arrayBufferWriter = new PooledArrayBuffer();
+                        var writer = new MsgPackWriter(arrayBufferWriter);
+
+                        writer.Write(PartitionAssignment.Length);
+
+                        for (var index = 0; index < 
PartitionAssignment.Length; index++)
+                        {
+                            var nodeId = PartitionAssignment[index];
+
+                            writer.Write(index); // Partition id.
+                            writer.Write(4); // Prop count.
+                            writer.Write(nodeId); // Id.
+                            writer.Write(nodeId); // Name.
+                            writer.Write("localhost"); // Host.
+                            writer.Write(10900 + index); // Port.
+                        }
+
+                        Send(handler, requestId, arrayBufferWriter);
+                        continue;
+                    }
                 }
 
                 // Fake error message for any other op code.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
index 88eb023be0..f9f21fec43 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
@@ -35,6 +35,7 @@ namespace Apache.Ignite.Tests
     public class IgniteTestsBase
     {
         protected const string TableName = "TBL1";
+        protected const int TablePartitionCount = 10;
 
         protected const string TableAllColumnsName = "TBL_ALL_COLUMNS";
         protected const string TableAllColumnsNotNullName = 
"TBL_ALL_COLUMNS_NOT_NULL";
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/PartitionManagerTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/PartitionManagerTests.cs
new file mode 100644
index 0000000000..ad0514657c
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/PartitionManagerTests.cs
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Table;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Compute;
+using Ignite.Compute;
+using Ignite.Table;
+using Internal.Table;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests for <see cref="IPartitionManager"/>.
+/// </summary>
+public class PartitionManagerTests : IgniteTestsBase
+{
+    [Test]
+    public async Task TestGetPrimaryReplicas()
+    {
+        var replicas = await Table.PartitionManager.GetPrimaryReplicasAsync();
+        var replicasNodes = replicas.Values.Distinct().OrderBy(x => 
x.Address.Port).ToList();
+        var replicasPartitions = replicas.Keys.Select(x => 
((HashPartition)x).PartitionId).OrderBy(x => x).ToList();
+
+        var expectedNodes = (await Client.GetClusterNodesAsync()).OrderBy(x => 
x.Address.Port).ToList();
+
+        CollectionAssert.AreEqual(expectedNodes, replicasNodes, "Primary 
replicas should be distributed among all nodes");
+
+        CollectionAssert.AreEqual(
+            Enumerable.Range(0, TablePartitionCount),
+            replicasPartitions,
+            "Primary replicas map should have all partitions");
+    }
+
+    [Test]
+    public async Task TestGetPrimaryReplicasReturnsCachedPartitionInstances()
+    {
+        var partitions1 = await GetPartitions();
+        var partitions2 = await GetPartitions();
+
+        for (int i = 0; i < partitions1.Count; i++)
+        {
+            Assert.AreSame(partitions1[i], partitions2[i]);
+        }
+
+        async Task<List<HashPartition>> GetPartitions()
+        {
+            var replicas = await 
Table.PartitionManager.GetPrimaryReplicasAsync();
+            return replicas.Keys.Cast<HashPartition>().OrderBy(x => 
x.PartitionId).ToList();
+        }
+    }
+
+    [Test]
+    public async Task TestGetPrimaryReplica()
+    {
+        var nodes = await Client.GetClusterNodesAsync();
+
+        for (int partId = 0; partId < TablePartitionCount; partId++)
+        {
+            var partition = new HashPartition(partId);
+            var replica = await 
Table.PartitionManager.GetPrimaryReplicaAsync(partition);
+
+            CollectionAssert.Contains(nodes, replica);
+        }
+    }
+
+    [Test]
+    public void TestGetPrimaryReplicaNegativePartitionIdThrows()
+    {
+        var ex = Assert.ThrowsAsync<ArgumentException>(
+            async () => await 
Table.PartitionManager.GetPrimaryReplicaAsync(new HashPartition(-1)));
+
+        Assert.AreEqual("Partition id can't be negative: HashPartition { 
PartitionId = -1 }", ex.Message);
+    }
+
+    [Test]
+    public void TestGetPrimaryReplicaPartitionIdOutOfRangeThrows()
+    {
+        var ex = Assert.ThrowsAsync<ArgumentException>(
+            async () => await 
Table.PartitionManager.GetPrimaryReplicaAsync(new HashPartition(10)));
+
+        Assert.AreEqual("Partition id can't be greater than 9: HashPartition { 
PartitionId = 10 }", ex.Message);
+    }
+
+    [Test]
+    public void TestGetPrimaryReplicaUnknownPartitionClassThrows()
+    {
+        var ex = Assert.ThrowsAsync<ArgumentException>(
+            async () => await 
Table.PartitionManager.GetPrimaryReplicaAsync(new MyPartition()));
+
+        Assert.AreEqual($"Unsupported partition type: {typeof(MyPartition)}", 
ex.Message);
+    }
+
+    [Test]
+    public async Task TestGetPartitionForKey([Values(true, false)] bool poco)
+    {
+        var jobTarget = JobTarget.AnyNode(await Client.GetClusterNodesAsync());
+
+        for (int id = 0; id < 30; id++)
+        {
+            var partition = poco
+                ? await Table.PartitionManager.GetPartitionAsync(GetPoco(id))
+                : await Table.PartitionManager.GetPartitionAsync(GetTuple(id));
+
+            var partitionJobExec = await Client.Compute.SubmitAsync(jobTarget, 
ComputeTests.PartitionJob, id);
+            var expectedPartition = await partitionJobExec.GetResultAsync();
+
+            Assert.AreEqual(expectedPartition, 
((HashPartition)partition).PartitionId);
+        }
+    }
+
+    [Test]
+    public async Task TestGetPartitionReturnsCachedInstance()
+    {
+        var partition1 = await 
Table.PartitionManager.GetPartitionAsync(GetTuple(1));
+        var partition2 = await 
Table.PartitionManager.GetPartitionAsync(GetTuple(1));
+
+        Assert.AreSame(partition1, partition2);
+    }
+
+    [Test]
+    public async Task TestPrimaryReplicaCacheInvalidation()
+    {
+        using var server = new FakeServer
+        {
+            PartitionAssignmentTimestamp = 123,
+            PartitionAssignment = new[] { "n1", "n2" }
+        };
+
+        using var client = await server.ConnectClientAsync();
+        var table = await 
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+        var partition = new HashPartition(0);
+
+        var replica1 = await 
table!.PartitionManager.GetPrimaryReplicaAsync(partition);
+        Assert.AreEqual("n1", replica1.Name);
+
+        server.PartitionAssignmentTimestamp = 124;
+        server.PartitionAssignment = new[] { "n2", "n1" };
+
+        await client.Tables.GetTablesAsync(); // Trigger cache invalidation 
with any response.
+
+        var replica2 = await 
table.PartitionManager.GetPrimaryReplicaAsync(partition);
+        Assert.AreEqual("n2", replica2.Name);
+    }
+
+    [Test]
+    public void TestPartitionEquality()
+    {
+        IPartition part1 = new HashPartition(1);
+        IPartition part1Copy = new HashPartition(1);
+        IPartition part2 = new HashPartition(2);
+        IPartition customPart = new MyPartition();
+
+        Assert.IsTrue(part1.Equals(part1Copy));
+        Assert.IsFalse(part1.Equals(part2));
+        Assert.IsFalse(part1.Equals(customPart));
+    }
+
+    private class MyPartition : IPartition
+    {
+        public bool Equals(IPartition? other) => false;
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs 
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 3de6312be0..dd75f3fa50 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -146,6 +146,11 @@ namespace Apache.Ignite
         /// </summary>
         ComputeChangePriority,
 
+        /// <summary>
+        /// Get primary replicas (<see 
cref="IPartitionManager.GetPrimaryReplicasAsync"/>).
+        /// </summary>
+        PrimaryReplicasGet,
+
         /// <summary>
         /// Send data streamer batch (<see cref="IDataStreamerTarget{T}"/>).
         /// </summary>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
index 060783ae79..a0c77eb9fc 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
@@ -88,13 +88,7 @@ namespace Apache.Ignite.Internal
 
                 for (var i = 0; i < count; i++)
                 {
-                    var fieldCount = r.ReadInt32();
-                    Debug.Assert(fieldCount == 4, "fieldCount == 4");
-
-                    res.Add(new ClusterNode(
-                        id: r.ReadString(),
-                        name: r.ReadString(),
-                        endpoint: new 
IPEndPoint(IPAddress.Parse(r.ReadString()), r.ReadInt32())));
+                    res.Add(ClusterNode.Read(ref r));
                 }
 
                 return res;
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
index 8534350ae3..d9b78e374d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
@@ -18,8 +18,11 @@
 namespace Apache.Ignite.Internal.Network
 {
     using System;
+    using System.Diagnostics;
+    using System.Linq;
     using System.Net;
     using Ignite.Network;
+    using Proto.MsgPack;
 
     /// <summary>
     /// Cluster node.
@@ -74,5 +77,30 @@ namespace Apache.Ignite.Internal.Network
 
         /// <inheritdoc/>
         public override int GetHashCode() => HashCode.Combine(Id, Name, 
Address);
+
+        /// <summary>
+        /// Read node from reader.
+        /// </summary>
+        /// <param name="r">Reader.</param>
+        /// <returns>Cluster node.</returns>
+        internal static ClusterNode Read(ref MsgPackReader r)
+        {
+            var fieldCount = r.ReadInt32();
+            Debug.Assert(fieldCount == 4, "fieldCount == 4");
+
+            var id = r.ReadString();
+            var name = r.ReadString();
+            var addr = r.ReadString();
+            var port = r.ReadInt32();
+
+            // TODO IGNITE-22695 .NET: ClusterNode.Address does not support 
host names
+            var ipAddress = IPAddress.TryParse(addr, out var ip)
+                ? ip
+                : Dns.GetHostEntry(addr).AddressList.FirstOrDefault() ?? 
IPAddress.Loopback;
+
+            var endPoint = new IPEndPoint(ipAddress, port);
+
+            return new ClusterNode(id, name, endPoint);
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 31929b769b..1c7f5eb760 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -133,6 +133,9 @@ namespace Apache.Ignite.Internal.Proto
         /** Send streamer batch. */
         StreamerBatchSend = 62,
 
+        /** Get primary replicas. */
+        PrimaryReplicasGet = 65,
+
         /** Send streamer batch with receiver. */
         StreamerWithReceiverBatchSend = 66
     }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index 749cc48a71..4fe559e99b 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -69,6 +69,7 @@ namespace Apache.Ignite.Internal.Proto
                 ClientOp.PartitionAssignmentGet => null,
                 ClientOp.SqlParamMeta => null,
                 ClientOp.StreamerBatchSend => 
ClientOperationType.StreamerBatchSend,
+                ClientOp.PrimaryReplicasGet => 
ClientOperationType.PrimaryReplicasGet,
                 ClientOp.StreamerWithReceiverBatchSend => 
ClientOperationType.StreamerBatchSend,
 
                 // Do not return null from default arm intentionally so we 
don't forget to update this when new ClientOp values are added.
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/HashPartition.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/HashPartition.cs
new file mode 100644
index 0000000000..e85c954cac
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/HashPartition.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Table;
+
+using Ignite.Table;
+
+/// <summary>
+/// Hash partition.
+/// </summary>
+/// <param name="PartitionId">Partition id.</param>
+internal sealed record HashPartition(int PartitionId) : IPartition // Not a 
struct to avoid interface boxing.
+{
+    /// <inheritdoc/>
+    public bool Equals(IPartition? other) =>
+        other is HashPartition hashPart && Equals(hashPart);
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/PartitionManager.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/PartitionManager.cs
new file mode 100644
index 0000000000..acfce82fd3
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/PartitionManager.cs
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Table;
+
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Threading.Tasks;
+using Common;
+using Ignite.Network;
+using Ignite.Table;
+using Network;
+using Proto;
+using Proto.MsgPack;
+using Serialization;
+
+/// <summary>
+/// Table partition manager.
+/// </summary>
+internal sealed class PartitionManager : IPartitionManager
+{
+    private static readonly object PartitionsLock = new();
+
+    // Cached partition objects (HashPartition is just a wrapper around a 
number).
+    // Those wrappers implement IPartition interface and can't be structs.
+    private static volatile HashPartition[]? _partitions;
+
+    private readonly Table _table;
+
+    private readonly object _primaryReplicasLock = new();
+
+    // Cached primary replicas.
+    private volatile PrimaryReplicas? _primaryReplicas;
+
+    /// <summary>
+    /// Initializes a new instance of the <see cref="PartitionManager"/> class.
+    /// </summary>
+    /// <param name="table">Table.</param>
+    internal PartitionManager(Table table)
+    {
+        _table = table;
+    }
+
+    /// <inheritdoc/>
+    public async ValueTask<IReadOnlyDictionary<IPartition, IClusterNode>> 
GetPrimaryReplicasAsync()
+    {
+        var replicas = await 
GetPrimaryReplicasInternalAsync().ConfigureAwait(false);
+
+        return replicas.GetDictionary();
+    }
+
+    /// <inheritdoc/>
+    public async ValueTask<IClusterNode> GetPrimaryReplicaAsync(IPartition 
partition)
+    {
+        IgniteArgumentCheck.NotNull(partition);
+
+        if (partition is not HashPartition hashPartition)
+        {
+            throw new ArgumentException("Unsupported partition type: " + 
partition.GetType());
+        }
+
+        if (hashPartition.PartitionId < 0)
+        {
+            throw new ArgumentException("Partition id can't be negative: " + 
partition);
+        }
+
+        var replicas = await 
GetPrimaryReplicasInternalAsync().ConfigureAwait(false);
+        var nodes = replicas.Nodes;
+
+        if (hashPartition.PartitionId >= nodes.Length)
+        {
+            throw new ArgumentException($"Partition id can't be greater than 
{nodes.Length - 1}: {partition}");
+        }
+
+        return nodes[hashPartition.PartitionId];
+    }
+
+    /// <inheritdoc/>
+    public ValueTask<IPartition> GetPartitionAsync(IIgniteTuple tuple) =>
+        GetPartitionInternalAsync(tuple, TupleSerializerHandler.Instance);
+
+    /// <inheritdoc/>
+    public ValueTask<IPartition> GetPartitionAsync<TK>(TK key)
+        where TK : notnull =>
+        GetPartitionInternalAsync(key, 
_table.GetRecordViewInternal<TK>().RecordSerializer.Handler);
+
+    /// <inheritdoc/>
+    public override string ToString() =>
+        new IgniteToStringBuilder(GetType())
+            .Append(_table, "Table")
+            .Build();
+
+    private static HashPartition[] GetCachedPartitionArray(int count)
+    {
+        var parts = _partitions;
+        if (parts != null && parts.Length >= count)
+        {
+            return parts;
+        }
+
+        lock (PartitionsLock)
+        {
+            parts = _partitions;
+            if (parts != null && parts.Length >= count)
+            {
+                return parts;
+            }
+
+            parts = new HashPartition[count];
+            for (var i = 0; i < count; i++)
+            {
+                parts[i] = new HashPartition(i);
+            }
+
+            _partitions = parts;
+            return parts;
+        }
+    }
+
+    private async ValueTask<PrimaryReplicas> GetPrimaryReplicasInternalAsync()
+    {
+        // Socket.PartitionAssignmentTimestamp is updated on every response, 
including heartbeats,
+        // so the cache can't be stale for very long.
+        var timestamp = _table.Socket.PartitionAssignmentTimestamp;
+        var cached = _primaryReplicas;
+
+        if (cached != null && cached.Timestamp >= timestamp)
+        {
+            return cached;
+        }
+
+        using var bufferWriter = ProtoCommon.GetMessageWriter();
+        bufferWriter.MessageWriter.Write(_table.Id);
+
+        using var resBuf = await 
_table.Socket.DoOutInOpAsync(ClientOp.PrimaryReplicasGet, 
bufferWriter).ConfigureAwait(false);
+        return Read(resBuf.GetReader());
+
+        PrimaryReplicas Read(MsgPackReader r)
+        {
+            var count = r.ReadInt32();
+            var primaryReplicas = new ClusterNode[count];
+
+            for (var i = 0; i < count; i++)
+            {
+                var id = r.ReadInt32();
+                var node = ClusterNode.Read(ref r);
+
+                primaryReplicas[id] = node;
+            }
+
+            PrimaryReplicas? replicas;
+
+            lock (_primaryReplicasLock)
+            {
+                replicas = _primaryReplicas;
+
+                if (replicas == null || replicas.Timestamp < timestamp)
+                {
+                    // Got newer data - update cached replicas.
+                    replicas = new PrimaryReplicas(primaryReplicas, timestamp);
+                    _primaryReplicas = replicas;
+                }
+            }
+
+            return replicas;
+        }
+    }
+
+    private async ValueTask<IPartition> GetPartitionInternalAsync<TK>(TK key, 
IRecordSerializerHandler<TK> serializerHandler)
+    {
+        var schema = await _table.GetSchemaAsync(null).ConfigureAwait(false);
+        var colocationHash = serializerHandler.GetKeyColocationHash(schema, 
key);
+
+        var partitions = await 
GetPrimaryReplicasInternalAsync().ConfigureAwait(false);
+        var partitionsCount = partitions.Nodes.Length;
+
+        var partitionId = Math.Abs(colocationHash % partitionsCount);
+        return GetCachedPartitionArray(partitionsCount)[partitionId];
+    }
+
+    [SuppressMessage("Performance", "CA1819:Properties should not return 
arrays", Justification = "Private record.")]
+    private record PrimaryReplicas(ClusterNode[] Nodes, long Timestamp)
+    {
+        private volatile ReadOnlyDictionary<IPartition, IClusterNode>? _dict;
+
+        public ReadOnlyDictionary<IPartition, IClusterNode> GetDictionary()
+        {
+            var dict = _dict;
+
+            // Race condition here is ok, one of the threads will just 
overwrite the other's result with the same data.
+            if (dict == null)
+            {
+                var res = new Dictionary<IPartition, IClusterNode>();
+                var parts = GetCachedPartitionArray(Nodes.Length);
+
+                for (var i = 0; i < Nodes.Length; i++)
+                {
+                    res.Add(parts[i], Nodes[i]);
+                }
+
+                dict = new ReadOnlyDictionary<IPartition, IClusterNode>(res);
+                _dict = dict;
+            }
+
+            return dict;
+        }
+    }
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/IRecordSerializerHandler.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/IRecordSerializerHandler.cs
index 3446f3bada..3eac886466 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/IRecordSerializerHandler.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/IRecordSerializerHandler.cs
@@ -71,6 +71,30 @@ namespace Apache.Ignite.Internal.Table.Serialization
             }
         }
 
+        /// <summary>
+        /// Gets the colocation hash.
+        /// </summary>
+        /// <param name="schema">Schema.</param>
+        /// <param name="key">Key.</param>
+        /// <returns>Colocation hash.</returns>
+        int GetKeyColocationHash(Schema schema, T key)
+        {
+            var tupleBuilder = new BinaryTupleBuilder(
+                numElements: schema.KeyColumns.Length,
+                hashedColumnsPredicate: 
schema.KeyOnlyHashedColumnIndexProvider);
+
+            try
+            {
+                Write(ref tupleBuilder, key, schema, keyOnly: true, 
Span<byte>.Empty);
+
+                return tupleBuilder.GetHash();
+            }
+            finally
+            {
+                tupleBuilder.Dispose();
+            }
+        }
+
         /// <summary>
         /// Writes a record.
         /// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index 196cc852b7..c72360a33c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -110,6 +110,8 @@ namespace Apache.Ignite.Internal.Table
 
             KeyValueBinaryView = new KeyValueView<IIgniteTuple, IIgniteTuple>(
                 new RecordView<KvPair<IIgniteTuple, IIgniteTuple>>(this, 
pairSerializer, _sql));
+
+            PartitionManager = new PartitionManager(this);
         }
 
         /// <inheritdoc/>
@@ -121,6 +123,9 @@ namespace Apache.Ignite.Internal.Table
         /// <inheritdoc/>
         public IKeyValueView<IIgniteTuple, IIgniteTuple> KeyValueBinaryView { 
get; }
 
+        /// <inheritdoc/>
+        public IPartitionManager PartitionManager { get; }
+
         /// <summary>
         /// Gets the associated socket.
         /// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs 
b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
index 6f76a2fc7d..9e02579fa5 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -63,6 +63,7 @@ namespace Apache.Ignite
                 ClientOperationType.ComputeGetStatus => true,
                 ClientOperationType.StreamerBatchSend => false,
                 ClientOperationType.StreamerWithReceiverBatchSend => false,
+                ClientOperationType.PrimaryReplicasGet => true,
                 var unsupported => throw new 
NotSupportedException("Unsupported operation type: " + unsupported)
             };
         }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/IPartition.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/IPartition.cs
new file mode 100644
index 0000000000..69c374c13d
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IPartition.cs
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Table;
+
+using System;
+using System.Diagnostics.CodeAnalysis;
+
+/// <summary>
+/// Table partition.
+/// </summary>
+[SuppressMessage("Design", "CA1040:Avoid empty interfaces", Justification = 
"Reviewed.")]
+public interface IPartition : IEquatable<IPartition>
+{
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/IPartitionManager.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/IPartitionManager.cs
new file mode 100644
index 0000000000..5e166b1c84
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IPartitionManager.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Table;
+
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Network;
+
+/// <summary>
+/// Partition manager provides table partition information.
+/// </summary>
+public interface IPartitionManager
+{
+    /// <summary>
+    /// Gets the primary replicas for all partitions.
+    /// <para />
+    /// NOTE: Prefer <see cref="GetPrimaryReplicaAsync"/> for 
performance-critical code.
+    /// </summary>
+    /// <returns>Map of partition to primary replica node.</returns>
+    ValueTask<IReadOnlyDictionary<IPartition, IClusterNode>> 
GetPrimaryReplicasAsync();
+
+    /// <summary>
+    /// Gets the primary replica for the specified partition.
+    /// <para />
+    /// NOTE: Prefer this method over <see cref="GetPrimaryReplicasAsync"/> 
for performance-critical code.
+    /// </summary>
+    /// <param name="partition">Partition.</param>
+    /// <returns>Primary replica.</returns>
+    ValueTask<IClusterNode> GetPrimaryReplicaAsync(IPartition partition);
+
+    /// <summary>
+    /// Gets the partition for the specified table key.
+    /// </summary>
+    /// <param name="tuple">Table key tuple.</param>
+    /// <returns>Partition that contains the specified key.</returns>
+    ValueTask<IPartition> GetPartitionAsync(IIgniteTuple tuple);
+
+    /// <summary>
+    /// Gets the partition for the specified table key.
+    /// </summary>
+    /// <param name="key">Table key.</param>
+    /// <returns>Partition that contains the specified key.</returns>
+    /// <typeparam name="TK">Key type.</typeparam>
+    ValueTask<IPartition> GetPartitionAsync<TK>(TK key)
+        where TK : notnull;
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs
index 8750d55a92..61635972e3 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs
@@ -37,6 +37,11 @@ namespace Apache.Ignite.Table
         /// </summary>
         public IKeyValueView<IIgniteTuple, IIgniteTuple> KeyValueBinaryView { 
get; }
 
+        /// <summary>
+        /// Gets the partition manager.
+        /// </summary>
+        public IPartitionManager PartitionManager { get; }
+
         /// <summary>
         /// Gets the record view mapped to specified type <typeparamref 
name="T"/>.
         /// <para />
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 95bcc56adc..ad12c6ed2d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -79,6 +79,7 @@ import 
org.apache.ignite.internal.security.configuration.SecurityChange;
 import org.apache.ignite.internal.security.configuration.SecurityConfiguration;
 import org.apache.ignite.internal.sql.SqlCommon;
 import org.apache.ignite.internal.table.RecordBinaryViewImpl;
+import org.apache.ignite.internal.table.partition.HashPartition;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -90,6 +91,7 @@ import org.apache.ignite.table.DataStreamerReceiverContext;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -852,4 +854,16 @@ public class PlatformTestNodeRunner {
             return CompletableFuture.completedFuture(page);
         }
     }
+
+    @SuppressWarnings("unused") // Used by platform tests.
+    private static class PartitionJob implements ComputeJob<Long, Integer> {
+        @Override
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Long id) {
+            Table table = context.ignite().tables().table(TABLE_NAME);
+            Tuple key = Tuple.create().set("key", id);
+            Partition partition = 
table.partitionManager().partitionAsync(key).join();
+
+            return completedFuture(((HashPartition) partition).partitionId());
+        }
+    }
 }


Reply via email to