This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new be15cc7c95 IGNITE-22150 .NET: Add PartitionManager API (#4062)
be15cc7c95 is described below
commit be15cc7c9504b1c735b998ea8ff376c22a505c91
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Jul 10 12:47:41 2024 +0300
IGNITE-22150 .NET: Add PartitionManager API (#4062)
* Add `ITable.PartitionManager` API (`GetPrimaryReplicas`, `GetPartition`)
* Cache results based on known `PartitionAssignmentTimestamp`
---
.../client/table/ClientPartitionManager.java | 6 -
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 2 +
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 23 +++
.../dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs | 1 +
.../Table/PartitionManagerTests.cs | 180 +++++++++++++++++
.../dotnet/Apache.Ignite/ClientOperationType.cs | 5 +
.../Apache.Ignite/Internal/IgniteClientInternal.cs | 8 +-
.../Apache.Ignite/Internal/Network/ClusterNode.cs | 28 +++
.../Apache.Ignite/Internal/Proto/ClientOp.cs | 3 +
.../Internal/Proto/ClientOpExtensions.cs | 1 +
.../Apache.Ignite/Internal/Table/HashPartition.cs | 31 +++
.../Internal/Table/PartitionManager.cs | 224 +++++++++++++++++++++
.../Serialization/IRecordSerializerHandler.cs | 24 +++
.../dotnet/Apache.Ignite/Internal/Table/Table.cs | 5 +
.../dotnet/Apache.Ignite/RetryReadPolicy.cs | 1 +
.../dotnet/Apache.Ignite/Table/IPartition.cs | 29 +++
.../Apache.Ignite/Table/IPartitionManager.cs | 61 ++++++
.../platforms/dotnet/Apache.Ignite/Table/ITable.cs | 5 +
.../runner/app/PlatformTestNodeRunner.java | 14 ++
19 files changed, 638 insertions(+), 13 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
index ad0494985d..1e07c62174 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.internal.client.table.ClientTupleSerializer.getP
import java.time.Instant;
import java.time.temporal.ChronoUnit;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -85,11 +84,6 @@ public class ClientPartitionManager implements
PartitionManager {
w -> w.out().packInt(tbl.tableId()),
r -> {
ClientMessageUnpacker in = r.in();
-
- if (in.tryUnpackNil()) {
- return Collections.<Partition, ClusterNode>emptyMap();
- }
-
int size = in.unpackInt();
Map<Partition, ClusterNode> res = new HashMap<>(size);
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 03e8213d2c..ee4d5e4514 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -66,6 +66,8 @@ namespace Apache.Ignite.Tests.Compute
public static readonly JobDescriptor<string, object>
CheckedExceptionJob = new(PlatformTestNodeRunner + "$CheckedExceptionJob");
+ public static readonly JobDescriptor<long, int> PartitionJob =
new(PlatformTestNodeRunner + "$PartitionJob");
+
[Test]
public async Task TestGetClusterNodes()
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 669b73167d..fd6d078db7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -377,6 +377,29 @@ namespace Apache.Ignite.Tests
Send(handler, requestId, Array.Empty<byte>());
continue;
}
+
+ case ClientOp.PrimaryReplicasGet:
+ {
+ using var arrayBufferWriter = new PooledArrayBuffer();
+ var writer = new MsgPackWriter(arrayBufferWriter);
+
+ writer.Write(PartitionAssignment.Length);
+
+ for (var index = 0; index <
PartitionAssignment.Length; index++)
+ {
+ var nodeId = PartitionAssignment[index];
+
+ writer.Write(index); // Partition id.
+ writer.Write(4); // Prop count.
+ writer.Write(nodeId); // Id.
+ writer.Write(nodeId); // Name.
+ writer.Write("localhost"); // Host.
+ writer.Write(10900 + index); // Port.
+ }
+
+ Send(handler, requestId, arrayBufferWriter);
+ continue;
+ }
}
// Fake error message for any other op code.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
index 88eb023be0..f9f21fec43 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
@@ -35,6 +35,7 @@ namespace Apache.Ignite.Tests
public class IgniteTestsBase
{
protected const string TableName = "TBL1";
+ protected const int TablePartitionCount = 10;
protected const string TableAllColumnsName = "TBL_ALL_COLUMNS";
protected const string TableAllColumnsNotNullName =
"TBL_ALL_COLUMNS_NOT_NULL";
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/PartitionManagerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/PartitionManagerTests.cs
new file mode 100644
index 0000000000..ad0514657c
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/PartitionManagerTests.cs
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Table;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Compute;
+using Ignite.Compute;
+using Ignite.Table;
+using Internal.Table;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests for <see cref="IPartitionManager"/>.
+/// </summary>
+public class PartitionManagerTests : IgniteTestsBase
+{
+ [Test]
+ public async Task TestGetPrimaryReplicas()
+ {
+ var replicas = await Table.PartitionManager.GetPrimaryReplicasAsync();
+ var replicasNodes = replicas.Values.Distinct().OrderBy(x =>
x.Address.Port).ToList();
+ var replicasPartitions = replicas.Keys.Select(x =>
((HashPartition)x).PartitionId).OrderBy(x => x).ToList();
+
+ var expectedNodes = (await Client.GetClusterNodesAsync()).OrderBy(x =>
x.Address.Port).ToList();
+
+ CollectionAssert.AreEqual(expectedNodes, replicasNodes, "Primary
replicas should be distributed among all nodes");
+
+ CollectionAssert.AreEqual(
+ Enumerable.Range(0, TablePartitionCount),
+ replicasPartitions,
+ "Primary replicas map should have all partitions");
+ }
+
+ [Test]
+ public async Task TestGetPrimaryReplicasReturnsCachedPartitionInstances()
+ {
+ var partitions1 = await GetPartitions();
+ var partitions2 = await GetPartitions();
+
+ for (int i = 0; i < partitions1.Count; i++)
+ {
+ Assert.AreSame(partitions1[i], partitions2[i]);
+ }
+
+ async Task<List<HashPartition>> GetPartitions()
+ {
+ var replicas = await
Table.PartitionManager.GetPrimaryReplicasAsync();
+ return replicas.Keys.Cast<HashPartition>().OrderBy(x =>
x.PartitionId).ToList();
+ }
+ }
+
+ [Test]
+ public async Task TestGetPrimaryReplica()
+ {
+ var nodes = await Client.GetClusterNodesAsync();
+
+ for (int partId = 0; partId < TablePartitionCount; partId++)
+ {
+ var partition = new HashPartition(partId);
+ var replica = await
Table.PartitionManager.GetPrimaryReplicaAsync(partition);
+
+ CollectionAssert.Contains(nodes, replica);
+ }
+ }
+
+ [Test]
+ public void TestGetPrimaryReplicaNegativePartitionIdThrows()
+ {
+ var ex = Assert.ThrowsAsync<ArgumentException>(
+ async () => await
Table.PartitionManager.GetPrimaryReplicaAsync(new HashPartition(-1)));
+
+ Assert.AreEqual("Partition id can't be negative: HashPartition {
PartitionId = -1 }", ex.Message);
+ }
+
+ [Test]
+ public void TestGetPrimaryReplicaPartitionIdOutOfRangeThrows()
+ {
+ var ex = Assert.ThrowsAsync<ArgumentException>(
+ async () => await
Table.PartitionManager.GetPrimaryReplicaAsync(new HashPartition(10)));
+
+ Assert.AreEqual("Partition id can't be greater than 9: HashPartition {
PartitionId = 10 }", ex.Message);
+ }
+
+ [Test]
+ public void TestGetPrimaryReplicaUnknownPartitionClassThrows()
+ {
+ var ex = Assert.ThrowsAsync<ArgumentException>(
+ async () => await
Table.PartitionManager.GetPrimaryReplicaAsync(new MyPartition()));
+
+ Assert.AreEqual($"Unsupported partition type: {typeof(MyPartition)}",
ex.Message);
+ }
+
+ [Test]
+ public async Task TestGetPartitionForKey([Values(true, false)] bool poco)
+ {
+ var jobTarget = JobTarget.AnyNode(await Client.GetClusterNodesAsync());
+
+ for (int id = 0; id < 30; id++)
+ {
+ var partition = poco
+ ? await Table.PartitionManager.GetPartitionAsync(GetPoco(id))
+ : await Table.PartitionManager.GetPartitionAsync(GetTuple(id));
+
+ var partitionJobExec = await Client.Compute.SubmitAsync(jobTarget,
ComputeTests.PartitionJob, id);
+ var expectedPartition = await partitionJobExec.GetResultAsync();
+
+ Assert.AreEqual(expectedPartition,
((HashPartition)partition).PartitionId);
+ }
+ }
+
+ [Test]
+ public async Task TestGetPartitionReturnsCachedInstance()
+ {
+ var partition1 = await
Table.PartitionManager.GetPartitionAsync(GetTuple(1));
+ var partition2 = await
Table.PartitionManager.GetPartitionAsync(GetTuple(1));
+
+ Assert.AreSame(partition1, partition2);
+ }
+
+ [Test]
+ public async Task TestPrimaryReplicaCacheInvalidation()
+ {
+ using var server = new FakeServer
+ {
+ PartitionAssignmentTimestamp = 123,
+ PartitionAssignment = new[] { "n1", "n2" }
+ };
+
+ using var client = await server.ConnectClientAsync();
+ var table = await
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+ var partition = new HashPartition(0);
+
+ var replica1 = await
table!.PartitionManager.GetPrimaryReplicaAsync(partition);
+ Assert.AreEqual("n1", replica1.Name);
+
+ server.PartitionAssignmentTimestamp = 124;
+ server.PartitionAssignment = new[] { "n2", "n1" };
+
+ await client.Tables.GetTablesAsync(); // Trigger cache invalidation
with any response.
+
+ var replica2 = await
table.PartitionManager.GetPrimaryReplicaAsync(partition);
+ Assert.AreEqual("n2", replica2.Name);
+ }
+
+ [Test]
+ public void TestPartitionEquality()
+ {
+ IPartition part1 = new HashPartition(1);
+ IPartition part1Copy = new HashPartition(1);
+ IPartition part2 = new HashPartition(2);
+ IPartition customPart = new MyPartition();
+
+ Assert.IsTrue(part1.Equals(part1Copy));
+ Assert.IsFalse(part1.Equals(part2));
+ Assert.IsFalse(part1.Equals(customPart));
+ }
+
+ private class MyPartition : IPartition
+ {
+ public bool Equals(IPartition? other) => false;
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 3de6312be0..dd75f3fa50 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -146,6 +146,11 @@ namespace Apache.Ignite
/// </summary>
ComputeChangePriority,
+ /// <summary>
+ /// Get primary replicas (<see
cref="IPartitionManager.GetPrimaryReplicasAsync"/>).
+ /// </summary>
+ PrimaryReplicasGet,
+
/// <summary>
/// Send data streamer batch (<see cref="IDataStreamerTarget{T}"/>).
/// </summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
index 060783ae79..a0c77eb9fc 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
@@ -88,13 +88,7 @@ namespace Apache.Ignite.Internal
for (var i = 0; i < count; i++)
{
- var fieldCount = r.ReadInt32();
- Debug.Assert(fieldCount == 4, "fieldCount == 4");
-
- res.Add(new ClusterNode(
- id: r.ReadString(),
- name: r.ReadString(),
- endpoint: new
IPEndPoint(IPAddress.Parse(r.ReadString()), r.ReadInt32())));
+ res.Add(ClusterNode.Read(ref r));
}
return res;
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
index 8534350ae3..d9b78e374d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Network/ClusterNode.cs
@@ -18,8 +18,11 @@
namespace Apache.Ignite.Internal.Network
{
using System;
+ using System.Diagnostics;
+ using System.Linq;
using System.Net;
using Ignite.Network;
+ using Proto.MsgPack;
/// <summary>
/// Cluster node.
@@ -74,5 +77,30 @@ namespace Apache.Ignite.Internal.Network
/// <inheritdoc/>
public override int GetHashCode() => HashCode.Combine(Id, Name,
Address);
+
+ /// <summary>
+ /// Read node from reader.
+ /// </summary>
+ /// <param name="r">Reader.</param>
+ /// <returns>Cluster node.</returns>
+ internal static ClusterNode Read(ref MsgPackReader r)
+ {
+ var fieldCount = r.ReadInt32();
+ Debug.Assert(fieldCount == 4, "fieldCount == 4");
+
+ var id = r.ReadString();
+ var name = r.ReadString();
+ var addr = r.ReadString();
+ var port = r.ReadInt32();
+
+ // TODO IGNITE-22695 .NET: ClusterNode.Address does not support
host names
+ var ipAddress = IPAddress.TryParse(addr, out var ip)
+ ? ip
+ : Dns.GetHostEntry(addr).AddressList.FirstOrDefault() ??
IPAddress.Loopback;
+
+ var endPoint = new IPEndPoint(ipAddress, port);
+
+ return new ClusterNode(id, name, endPoint);
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 31929b769b..1c7f5eb760 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -133,6 +133,9 @@ namespace Apache.Ignite.Internal.Proto
/** Send streamer batch. */
StreamerBatchSend = 62,
+ /** Get primary replicas. */
+ PrimaryReplicasGet = 65,
+
/** Send streamer batch with receiver. */
StreamerWithReceiverBatchSend = 66
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index 749cc48a71..4fe559e99b 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -69,6 +69,7 @@ namespace Apache.Ignite.Internal.Proto
ClientOp.PartitionAssignmentGet => null,
ClientOp.SqlParamMeta => null,
ClientOp.StreamerBatchSend =>
ClientOperationType.StreamerBatchSend,
+ ClientOp.PrimaryReplicasGet =>
ClientOperationType.PrimaryReplicasGet,
ClientOp.StreamerWithReceiverBatchSend =>
ClientOperationType.StreamerBatchSend,
// Do not return null from default arm intentionally so we
don't forget to update this when new ClientOp values are added.
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/HashPartition.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/HashPartition.cs
new file mode 100644
index 0000000000..e85c954cac
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/HashPartition.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Table;
+
+using Ignite.Table;
+
+/// <summary>
+/// Hash partition.
+/// </summary>
+/// <param name="PartitionId">Partition id.</param>
+internal sealed record HashPartition(int PartitionId) : IPartition // Not a
struct to avoid interface boxing.
+{
+ /// <inheritdoc/>
+ public bool Equals(IPartition? other) =>
+ other is HashPartition hashPart && Equals(hashPart);
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/PartitionManager.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/PartitionManager.cs
new file mode 100644
index 0000000000..acfce82fd3
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/PartitionManager.cs
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Table;
+
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Threading.Tasks;
+using Common;
+using Ignite.Network;
+using Ignite.Table;
+using Network;
+using Proto;
+using Proto.MsgPack;
+using Serialization;
+
+/// <summary>
+/// Table partition manager.
+/// </summary>
+internal sealed class PartitionManager : IPartitionManager
+{
+ private static readonly object PartitionsLock = new();
+
+ // Cached partition objects (HashPartition is just a wrapper around a
number).
+ // Those wrappers implement IPartition interface and can't be structs.
+ private static volatile HashPartition[]? _partitions;
+
+ private readonly Table _table;
+
+ private readonly object _primaryReplicasLock = new();
+
+ // Cached primary replicas.
+ private volatile PrimaryReplicas? _primaryReplicas;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="PartitionManager"/> class.
+ /// </summary>
+ /// <param name="table">Table.</param>
+ internal PartitionManager(Table table)
+ {
+ _table = table;
+ }
+
+ /// <inheritdoc/>
+ public async ValueTask<IReadOnlyDictionary<IPartition, IClusterNode>>
GetPrimaryReplicasAsync()
+ {
+ var replicas = await
GetPrimaryReplicasInternalAsync().ConfigureAwait(false);
+
+ return replicas.GetDictionary();
+ }
+
+ /// <inheritdoc/>
+ public async ValueTask<IClusterNode> GetPrimaryReplicaAsync(IPartition
partition)
+ {
+ IgniteArgumentCheck.NotNull(partition);
+
+ if (partition is not HashPartition hashPartition)
+ {
+ throw new ArgumentException("Unsupported partition type: " +
partition.GetType());
+ }
+
+ if (hashPartition.PartitionId < 0)
+ {
+ throw new ArgumentException("Partition id can't be negative: " +
partition);
+ }
+
+ var replicas = await
GetPrimaryReplicasInternalAsync().ConfigureAwait(false);
+ var nodes = replicas.Nodes;
+
+ if (hashPartition.PartitionId >= nodes.Length)
+ {
+ throw new ArgumentException($"Partition id can't be greater than
{nodes.Length - 1}: {partition}");
+ }
+
+ return nodes[hashPartition.PartitionId];
+ }
+
+ /// <inheritdoc/>
+ public ValueTask<IPartition> GetPartitionAsync(IIgniteTuple tuple) =>
+ GetPartitionInternalAsync(tuple, TupleSerializerHandler.Instance);
+
+ /// <inheritdoc/>
+ public ValueTask<IPartition> GetPartitionAsync<TK>(TK key)
+ where TK : notnull =>
+ GetPartitionInternalAsync(key,
_table.GetRecordViewInternal<TK>().RecordSerializer.Handler);
+
+ /// <inheritdoc/>
+ public override string ToString() =>
+ new IgniteToStringBuilder(GetType())
+ .Append(_table, "Table")
+ .Build();
+
+ private static HashPartition[] GetCachedPartitionArray(int count)
+ {
+ var parts = _partitions;
+ if (parts != null && parts.Length >= count)
+ {
+ return parts;
+ }
+
+ lock (PartitionsLock)
+ {
+ parts = _partitions;
+ if (parts != null && parts.Length >= count)
+ {
+ return parts;
+ }
+
+ parts = new HashPartition[count];
+ for (var i = 0; i < count; i++)
+ {
+ parts[i] = new HashPartition(i);
+ }
+
+ _partitions = parts;
+ return parts;
+ }
+ }
+
+ private async ValueTask<PrimaryReplicas> GetPrimaryReplicasInternalAsync()
+ {
+ // Socket.PartitionAssignmentTimestamp is updated on every response,
including heartbeats,
+ // so the cache can't be stale for very long.
+ var timestamp = _table.Socket.PartitionAssignmentTimestamp;
+ var cached = _primaryReplicas;
+
+ if (cached != null && cached.Timestamp >= timestamp)
+ {
+ return cached;
+ }
+
+ using var bufferWriter = ProtoCommon.GetMessageWriter();
+ bufferWriter.MessageWriter.Write(_table.Id);
+
+ using var resBuf = await
_table.Socket.DoOutInOpAsync(ClientOp.PrimaryReplicasGet,
bufferWriter).ConfigureAwait(false);
+ return Read(resBuf.GetReader());
+
+ PrimaryReplicas Read(MsgPackReader r)
+ {
+ var count = r.ReadInt32();
+ var primaryReplicas = new ClusterNode[count];
+
+ for (var i = 0; i < count; i++)
+ {
+ var id = r.ReadInt32();
+ var node = ClusterNode.Read(ref r);
+
+ primaryReplicas[id] = node;
+ }
+
+ PrimaryReplicas? replicas;
+
+ lock (_primaryReplicasLock)
+ {
+ replicas = _primaryReplicas;
+
+ if (replicas == null || replicas.Timestamp < timestamp)
+ {
+ // Got newer data - update cached replicas.
+ replicas = new PrimaryReplicas(primaryReplicas, timestamp);
+ _primaryReplicas = replicas;
+ }
+ }
+
+ return replicas;
+ }
+ }
+
+ private async ValueTask<IPartition> GetPartitionInternalAsync<TK>(TK key,
IRecordSerializerHandler<TK> serializerHandler)
+ {
+ var schema = await _table.GetSchemaAsync(null).ConfigureAwait(false);
+ var colocationHash = serializerHandler.GetKeyColocationHash(schema,
key);
+
+ var partitions = await
GetPrimaryReplicasInternalAsync().ConfigureAwait(false);
+ var partitionsCount = partitions.Nodes.Length;
+
+ var partitionId = Math.Abs(colocationHash % partitionsCount);
+ return GetCachedPartitionArray(partitionsCount)[partitionId];
+ }
+
+ [SuppressMessage("Performance", "CA1819:Properties should not return
arrays", Justification = "Private record.")]
+ private record PrimaryReplicas(ClusterNode[] Nodes, long Timestamp)
+ {
+ private volatile ReadOnlyDictionary<IPartition, IClusterNode>? _dict;
+
+ public ReadOnlyDictionary<IPartition, IClusterNode> GetDictionary()
+ {
+ var dict = _dict;
+
+ // Race condition here is ok, one of the threads will just
overwrite the other's result with the same data.
+ if (dict == null)
+ {
+ var res = new Dictionary<IPartition, IClusterNode>();
+ var parts = GetCachedPartitionArray(Nodes.Length);
+
+ for (var i = 0; i < Nodes.Length; i++)
+ {
+ res.Add(parts[i], Nodes[i]);
+ }
+
+ dict = new ReadOnlyDictionary<IPartition, IClusterNode>(res);
+ _dict = dict;
+ }
+
+ return dict;
+ }
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/IRecordSerializerHandler.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/IRecordSerializerHandler.cs
index 3446f3bada..3eac886466 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/IRecordSerializerHandler.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/IRecordSerializerHandler.cs
@@ -71,6 +71,30 @@ namespace Apache.Ignite.Internal.Table.Serialization
}
}
+ /// <summary>
+ /// Gets the colocation hash.
+ /// </summary>
+ /// <param name="schema">Schema.</param>
+ /// <param name="key">Key.</param>
+ /// <returns>Colocation hash.</returns>
+ int GetKeyColocationHash(Schema schema, T key)
+ {
+ var tupleBuilder = new BinaryTupleBuilder(
+ numElements: schema.KeyColumns.Length,
+ hashedColumnsPredicate:
schema.KeyOnlyHashedColumnIndexProvider);
+
+ try
+ {
+ Write(ref tupleBuilder, key, schema, keyOnly: true,
Span<byte>.Empty);
+
+ return tupleBuilder.GetHash();
+ }
+ finally
+ {
+ tupleBuilder.Dispose();
+ }
+ }
+
/// <summary>
/// Writes a record.
/// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index 196cc852b7..c72360a33c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -110,6 +110,8 @@ namespace Apache.Ignite.Internal.Table
KeyValueBinaryView = new KeyValueView<IIgniteTuple, IIgniteTuple>(
new RecordView<KvPair<IIgniteTuple, IIgniteTuple>>(this,
pairSerializer, _sql));
+
+ PartitionManager = new PartitionManager(this);
}
/// <inheritdoc/>
@@ -121,6 +123,9 @@ namespace Apache.Ignite.Internal.Table
/// <inheritdoc/>
public IKeyValueView<IIgniteTuple, IIgniteTuple> KeyValueBinaryView {
get; }
+ /// <inheritdoc/>
+ public IPartitionManager PartitionManager { get; }
+
/// <summary>
/// Gets the associated socket.
/// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
index 6f76a2fc7d..9e02579fa5 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -63,6 +63,7 @@ namespace Apache.Ignite
ClientOperationType.ComputeGetStatus => true,
ClientOperationType.StreamerBatchSend => false,
ClientOperationType.StreamerWithReceiverBatchSend => false,
+ ClientOperationType.PrimaryReplicasGet => true,
var unsupported => throw new
NotSupportedException("Unsupported operation type: " + unsupported)
};
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/IPartition.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/IPartition.cs
new file mode 100644
index 0000000000..69c374c13d
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IPartition.cs
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Table;
+
+using System;
+using System.Diagnostics.CodeAnalysis;
+
+/// <summary>
+/// Table partition.
+/// </summary>
+[SuppressMessage("Design", "CA1040:Avoid empty interfaces", Justification =
"Reviewed.")]
+public interface IPartition : IEquatable<IPartition>
+{
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/IPartitionManager.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/IPartitionManager.cs
new file mode 100644
index 0000000000..5e166b1c84
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IPartitionManager.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Table;
+
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Network;
+
+/// <summary>
+/// Partition manager provides table partition information.
+/// </summary>
+public interface IPartitionManager
+{
+ /// <summary>
+ /// Gets the primary replicas for all partitions.
+ /// <para />
+ /// NOTE: Prefer <see cref="GetPrimaryReplicaAsync"/> for
performance-critical code.
+ /// </summary>
+ /// <returns>Map of partition to primary replica node.</returns>
+ ValueTask<IReadOnlyDictionary<IPartition, IClusterNode>>
GetPrimaryReplicasAsync();
+
+ /// <summary>
+ /// Gets the primary replica for the specified partition.
+ /// <para />
+ /// NOTE: Prefer this method over <see cref="GetPrimaryReplicasAsync"/>
for performance-critical code.
+ /// </summary>
+ /// <param name="partition">Partition.</param>
+ /// <returns>Primary replica.</returns>
+ ValueTask<IClusterNode> GetPrimaryReplicaAsync(IPartition partition);
+
+ /// <summary>
+ /// Gets the partition for the specified table key.
+ /// </summary>
+ /// <param name="tuple">Table key tuple.</param>
+ /// <returns>Partition that contains the specified key.</returns>
+ ValueTask<IPartition> GetPartitionAsync(IIgniteTuple tuple);
+
+ /// <summary>
+ /// Gets the partition for the specified table key.
+ /// </summary>
+ /// <param name="key">Table key.</param>
+ /// <returns>Partition that contains the specified key.</returns>
+ /// <typeparam name="TK">Key type.</typeparam>
+ ValueTask<IPartition> GetPartitionAsync<TK>(TK key)
+ where TK : notnull;
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs
b/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs
index 8750d55a92..61635972e3 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/ITable.cs
@@ -37,6 +37,11 @@ namespace Apache.Ignite.Table
/// </summary>
public IKeyValueView<IIgniteTuple, IIgniteTuple> KeyValueBinaryView {
get; }
+ /// <summary>
+ /// Gets the partition manager.
+ /// </summary>
+ public IPartitionManager PartitionManager { get; }
+
/// <summary>
/// Gets the record view mapped to specified type <typeparamref
name="T"/>.
/// <para />
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 95bcc56adc..ad12c6ed2d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -79,6 +79,7 @@ import
org.apache.ignite.internal.security.configuration.SecurityChange;
import org.apache.ignite.internal.security.configuration.SecurityConfiguration;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.table.RecordBinaryViewImpl;
+import org.apache.ignite.internal.table.partition.HashPartition;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -90,6 +91,7 @@ import org.apache.ignite.table.DataStreamerReceiverContext;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
import org.jetbrains.annotations.Nullable;
/**
@@ -852,4 +854,16 @@ public class PlatformTestNodeRunner {
return CompletableFuture.completedFuture(page);
}
}
+
+ @SuppressWarnings("unused") // Used by platform tests.
+ private static class PartitionJob implements ComputeJob<Long, Integer> {
+ @Override
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Long id) {
+ Table table = context.ignite().tables().table(TABLE_NAME);
+ Tuple key = Tuple.create().set("key", id);
+ Partition partition =
table.partitionManager().partitionAsync(key).join();
+
+ return completedFuture(((HashPartition) partition).partitionId());
+ }
+ }
}