This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 98b74c2567b IGNITE-26853 .NET: Add SQL partition awareness (#7599)
98b74c2567b is described below
commit 98b74c2567b5cbbdf9d16c8b2ceccd477a77f782
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Feb 17 08:36:55 2026 +0200
IGNITE-26853 .NET: Add SQL partition awareness (#7599)
Protocol:
- Add table name to SQL partition awareness metadata
(`SQL_PARTITION_AWARENESS_TABLE_NAME` feature) to enable table cache reuse
Public API:
- Add `IgniteClientConfiguration.SqlPartitionAwarenessMetadataCacheSize`
Impl:
- Reuse existing table cache for partition assignments and schemas
- Add `ConcurrentCache<K, V>` for SQL partition awareness meta
- Wraps `ConcurrentDictionary` with simple SIEVE-like eviction
---
.../internal/client/proto/HandshakeUtils.java | 1 +
.../client/proto/ProtocolBitmaskFeature.java | 7 +-
.../client/handler/ItClientHandlerMetricsTest.java | 4 +-
.../ignite/client/handler/ItClientHandlerTest.java | 4 +-
.../ignite/client/handler/ClientHandlerModule.java | 3 +-
.../handler/ClientInboundMessageHandler.java | 23 +-
.../handler/requests/sql/ClientSqlCommon.java | 20 +-
.../sql/ClientSqlCursorNextResultRequest.java | 10 +-
.../requests/sql/ClientSqlExecuteRequest.java | 13 +-
.../org/apache/ignite/client/fakes/FakeCursor.java | 6 +-
.../Apache.Ignite.Tests/ConcurrentCacheTest.cs | 133 +++++++++
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 4 +-
.../dotnet/Apache.Ignite.Tests/MetricsTests.cs | 6 +-
.../PartitionAwarenessRealClusterTests.cs | 319 +++++++++++++++++++--
.../Proto/ProtocolBitmaskFeatureExtensionsTest.cs | 170 +++++++++++
.../Apache.Ignite/IgniteClientConfiguration.cs | 33 +++
.../Apache.Ignite/Internal/ClientFailoverSocket.cs | 46 ++-
...ProtocolBitmaskFeature.cs => ClientResponse.cs} | 46 ++-
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 13 +-
.../Internal/Common/ConcurrentCache.cs | 124 ++++++++
.../Apache.Ignite/Internal/IgniteClientInternal.cs | 6 +-
.../Proto/BinaryTuple/BinaryTupleBuilder.cs | 9 +
.../Internal/Proto/ProtocolBitmaskFeature.cs | 12 +-
.../Proto/ProtocolBitmaskFeatureExtensions.cs | 67 +++++
.../dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs | 68 ++++-
.../dotnet/Apache.Ignite/Internal/Sql/Sql.cs | 68 ++++-
.../Internal/Sql/SqlPartitionAwarenessMetadata.cs | 51 ++++
.../Internal/Sql/SqlPartitionMappingProvider.cs | 105 +++++++
.../dotnet/Apache.Ignite/Internal/Table/Schema.cs | 18 ++
.../dotnet/Apache.Ignite/Internal/Table/Tables.cs | 41 +--
.../Sql/IgniteDbConnectionStringBuilder.cs | 17 +-
.../PartitionAwarenessMetadata.java | 13 +-
.../PartitionAwarenessMetadataExtractor.java | 13 +-
.../PartitionAwarenessMetadataTest.java | 3 +-
34 files changed, 1339 insertions(+), 137 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/HandshakeUtils.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/HandshakeUtils.java
index e544c4fe341..a8d6de36fee 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/HandshakeUtils.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/HandshakeUtils.java
@@ -85,6 +85,7 @@ public class HandshakeUtils {
* @return Features.
*/
public static BitSet unpackFeatures(ClientMessageUnpacker unpacker) {
+ // BitSet.valueOf is always little-endian.
return BitSet.valueOf(unpacker.readBinary());
}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
index 3e9e2417434..363070971d0 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
@@ -102,7 +102,12 @@ public enum ProtocolBitmaskFeature {
/**
* Send remote writes flag for directly mapped transactions.
*/
- TX_DIRECT_MAPPING_SEND_REMOTE_WRITES(15);
+ TX_DIRECT_MAPPING_SEND_REMOTE_WRITES(15),
+
+ /**
+ * Client supports Partition Awareness for SQL queries with table name in
the response metadata.
+ */
+ SQL_PARTITION_AWARENESS_TABLE_NAME(16);
private static final EnumSet<ProtocolBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
EnumSet.allOf(ProtocolBitmaskFeature.class);
diff --git
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java
index 95ef46fdbfc..da548cb8305 100644
---
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java
+++
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerMetricsTest.java
@@ -171,7 +171,7 @@ public class ItClientHandlerMetricsTest extends
BaseIgniteAbstractTest {
ItClientHandlerTestUtils.connectAndHandshake(serverModule);
assertTrue(
- IgniteTestUtils.waitForCondition(() ->
testServer.metrics().bytesSent() == 107, 1000),
+ IgniteTestUtils.waitForCondition(() ->
testServer.metrics().bytesSent() == 108, 1000),
() -> "bytesSent: " + testServer.metrics().bytesSent());
assertTrue(
@@ -181,7 +181,7 @@ public class ItClientHandlerMetricsTest extends
BaseIgniteAbstractTest {
ItClientHandlerTestUtils.connectAndHandshake(serverModule, false,
true);
assertTrue(
- IgniteTestUtils.waitForCondition(() ->
testServer.metrics().bytesSent() == 319, 1000),
+ IgniteTestUtils.waitForCondition(() ->
testServer.metrics().bytesSent() == 320, 1000),
() -> "bytesSent: " + testServer.metrics().bytesSent());
assertTrue(
diff --git
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index cc5d65f275b..746b1262f9b 100644
---
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -158,7 +158,7 @@ public class ItClientHandlerTest extends
BaseIgniteAbstractTest {
unpacker.skipValue(extensionsLen);
assertArrayEquals(MAGIC, magic);
- assertEquals(99, len);
+ assertEquals(100, len);
assertEquals(3, major);
assertEquals(0, minor);
assertEquals(0, patch);
@@ -561,6 +561,8 @@ public class ItClientHandlerTest extends
BaseIgniteAbstractTest {
expected.set(13);
expected.set(14);
expected.set(15);
+ expected.set(16);
+
assertEquals(expected, supportedFeatures);
var extensionsLen = unpacker.unpackInt();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 0bab18395fe..8bb881142b7 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -99,7 +99,8 @@ public class ClientHandlerModule implements IgniteComponent,
PlatformComputeTran
ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS,
ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS,
- ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES
+ ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
+ ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME
));
/** Connection id generator.
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 8b89bbfb58a..94d8da215ac 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -20,6 +20,7 @@ package org.apache.ignite.client.handler;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.STREAMER_RECEIVER_EXECUTION_OPTIONS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_ALLOW_NOOP_ENLIST;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS;
@@ -996,10 +997,24 @@ public class ClientInboundMessageHandler
case ClientOp.SQL_EXEC:
return ClientSqlExecuteRequest.process(
- partitionOperationsExecutor, in, requestId,
cancelHandles, queryProcessor, resources, metrics, tsTracker,
- clientContext.hasFeature(SQL_PARTITION_AWARENESS),
clientContext.hasFeature(SQL_DIRECT_TX_MAPPING), txManager,
- igniteTables, clockService,
notificationSender(requestId), resolveCurrentUsername(),
- clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT),
queryTypeListener
+ partitionOperationsExecutor,
+ in,
+ requestId,
+ cancelHandles,
+ queryProcessor,
+ resources,
+ metrics,
+ tsTracker,
+ clientContext.hasFeature(SQL_PARTITION_AWARENESS),
+ clientContext.hasFeature(SQL_DIRECT_TX_MAPPING),
+ txManager,
+ igniteTables,
+ clockService,
+ notificationSender(requestId),
+ resolveCurrentUsername(),
+ clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT),
+
clientContext.hasFeature(SQL_PARTITION_AWARENESS_TABLE_NAME),
+ queryTypeListener
);
case ClientOp.SQL_CURSOR_NEXT_RESULT_SET:
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
index 7c842d51b96..aa8acf537b5 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
@@ -272,6 +272,7 @@ class ClientSqlCommon {
boolean includePartitionAwarenessMeta,
boolean sqlDirectTxMappingSupported,
boolean sqlMultiStatementSupported,
+ boolean sqlPartitionAwarenessQualifiedNameSupported,
Executor executor
) {
try {
@@ -292,13 +293,15 @@ class ClientSqlCommon {
return CompletableFuture.completedFuture(out ->
writeResultSet(out, asyncResultSet, resourceId,
includePartitionAwarenessMeta,
- sqlDirectTxMappingSupported,
sqlMultiStatementSupported, nextResultResourceId));
+ sqlDirectTxMappingSupported,
sqlMultiStatementSupported, sqlPartitionAwarenessQualifiedNameSupported,
+ nextResultResourceId));
}
return asyncResultSet.closeAsync()
.thenApply(v -> (ResponseWriter) out ->
writeResultSet(out, asyncResultSet, null,
includePartitionAwarenessMeta,
- sqlDirectTxMappingSupported,
sqlMultiStatementSupported, nextResultResourceId));
+ sqlDirectTxMappingSupported,
sqlMultiStatementSupported, sqlPartitionAwarenessQualifiedNameSupported,
+ nextResultResourceId));
} catch (IgniteInternalCheckedException e) {
// Resource registry was closed.
@@ -349,6 +352,7 @@ class ClientSqlCommon {
boolean includePartitionAwarenessMeta,
boolean sqlDirectTxMappingSupported,
boolean sqlMultiStatementsSupported,
+ boolean sqlPartitionAwarenessQualifiedNameSupported,
@Nullable Long nextResultResourceId
) {
out.packLongNullable(resourceId);
@@ -361,7 +365,8 @@ class ClientSqlCommon {
packMeta(out, res.metadata());
if (includePartitionAwarenessMeta) {
- packPartitionAwarenessMeta(out, res.partitionAwarenessMetadata(),
sqlDirectTxMappingSupported);
+ packPartitionAwarenessMeta(out, res.partitionAwarenessMetadata(),
sqlDirectTxMappingSupported,
+ sqlPartitionAwarenessQualifiedNameSupported);
}
if (sqlMultiStatementsSupported) {
@@ -386,7 +391,8 @@ class ClientSqlCommon {
private static void packPartitionAwarenessMeta(
ClientMessagePacker out,
@Nullable PartitionAwarenessMetadata meta,
- boolean sqlDirectTxMappingSupported
+ boolean sqlDirectTxMappingSupported,
+ boolean sqlPartitionAwarenessQualifiedNameSupported
) {
if (meta == null) {
out.packNil();
@@ -394,6 +400,12 @@ class ClientSqlCommon {
}
out.packInt(meta.tableId());
+
+ if (sqlPartitionAwarenessQualifiedNameSupported) {
+ out.packString(meta.tableName().schemaName());
+ out.packString(meta.tableName().objectName());
+ }
+
out.packIntArray(meta.indexes());
out.packIntArray(meta.hash());
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
index 0c6025aad9e..615ad7678cc 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
@@ -59,7 +59,15 @@ public class ClientSqlCursorNextResultRequest {
pageSize
)
).thenCompose(asyncResultSet ->
- ClientSqlCommon.writeResultSetAsync(resources,
asyncResultSet, metrics, pageSize, false, false, true,
+ ClientSqlCommon.writeResultSetAsync(
+ resources,
+ asyncResultSet,
+ metrics,
+ pageSize,
+ false,
+ false,
+ true,
+ false,
operationExecutor)
).thenApply(rsWriter -> rsWriter), operationExecutor);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index baab0d9a4a0..cbcd37de96b 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -99,6 +99,7 @@ public class ClientSqlExecuteRequest {
NotificationSender notificationSender,
@Nullable String username,
boolean sqlMultistatementsSupported,
+ boolean sqlPartitionAwarenessQualifiedNameSupported,
Consumer<SqlQueryType> queryTypeListener
) {
CancelHandle cancelHandle = CancelHandle.create();
@@ -141,8 +142,16 @@ public class ClientSqlExecuteRequest {
queryTypeListener,
arguments
).thenCompose(asyncResultSet ->
- ClientSqlCommon.writeResultSetAsync(resources,
asyncResultSet, metrics, props.pageSize(),
- includePartitionAwarenessMeta,
sqlDirectTxMappingSupported, sqlMultistatementsSupported, operationExecutor))
+ ClientSqlCommon.writeResultSetAsync(
+ resources,
+ asyncResultSet,
+ metrics,
+ props.pageSize(),
+ includePartitionAwarenessMeta,
+ sqlDirectTxMappingSupported,
+ sqlMultistatementsSupported,
+ sqlPartitionAwarenessQualifiedNameSupported,
+ operationExecutor))
.thenApply(rsWriter -> out -> {
if (tx != null) {
writeTxMeta(out, timestampTracker, clockService, tx,
resIdHolder[0]);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
index 9372dc85e0d..231ee2349b0 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.internal.sql.engine.util.ListToInternalSqlRowAdapter;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.table.QualifiedName;
import org.jetbrains.annotations.Nullable;
/**
@@ -113,11 +114,12 @@ public class FakeCursor implements
AsyncSqlCursor<InternalSqlRow> {
columns.add(new FakeColumnMetadata("script", ColumnType.STRING));
} else if ("SELECT PA".equals(qry)) {
paMeta = new PartitionAwarenessMetadata(1, new int[] {0, -1, -2,
2}, new int[] {100, 500},
- DirectTxMode.SUPPORTED_TRACKING_REQUIRED);
+ DirectTxMode.SUPPORTED_TRACKING_REQUIRED,
QualifiedName.parse("PUBLIC.TBL"));
rows.add(getRow(1));
columns.add(new FakeColumnMetadata("col1", ColumnType.INT32));
} else if ("SELECT SINGLE COLUMN PA".equals(qry)) {
- paMeta = new PartitionAwarenessMetadata(100500, new int[] {0}, new
int[0], DirectTxMode.SUPPORTED);
+ paMeta = new PartitionAwarenessMetadata(100500, new int[] {0}, new
int[0], DirectTxMode.SUPPORTED,
+ QualifiedName.parse("PUBLIC.TBL"));
rows.add(getRow(1));
columns.add(new FakeColumnMetadata("col1", ColumnType.INT32));
} else if ("SELECT ALLOWED QUERY TYPES".equals(qry)) {
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/ConcurrentCacheTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/ConcurrentCacheTest.cs
new file mode 100644
index 00000000000..d1f9e16d4a3
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ConcurrentCacheTest.cs
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests;
+
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Common;
+using Internal.Common;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests for <see cref="ConcurrentCache{TKey,TValue}"/>.
+/// </summary>
+public class ConcurrentCacheTest
+{
+ [Test]
+ public void TestTryAdd()
+ {
+ var cache = new ConcurrentCache<int, string>(10);
+
+ Assert.IsTrue(cache.TryAdd(1, "one"));
+ Assert.AreEqual("one", cache.GetValueOrDefault(1));
+ }
+
+ [Test]
+ public void TestTryAddExistingKey()
+ {
+ var cache = new ConcurrentCache<int, string>(10);
+
+ Assert.IsTrue(cache.TryAdd(1, "one"));
+ Assert.IsFalse(cache.TryAdd(1, "uno"));
+ Assert.AreEqual("one", cache.GetValueOrDefault(1));
+ }
+
+ [Test]
+ public void TestGetValueOrDefaultMissingKey()
+ {
+ var cache = new ConcurrentCache<int, string>(10);
+
+ Assert.IsNull(cache.GetValueOrDefault(1));
+ }
+
+ [Test]
+ public void TestEviction()
+ {
+ var cache = new ConcurrentCache<int, string>(2);
+
+ cache.TryAdd(1, "one");
+ cache.TryAdd(2, "two");
+ cache.GetValueOrDefault(1); // Mark entry 1 as visited
+ cache.TryAdd(3, "four");
+
+ Assert.IsNull(cache.GetValueOrDefault(2), "Unused entry 2 should be
evicted");
+ Assert.AreEqual("one", cache.GetValueOrDefault(1));
+ Assert.AreEqual("four", cache.GetValueOrDefault(3));
+ }
+
+ [Test]
+ public void TestMultipleVisitedEntriesArePreserved()
+ {
+ var cache = new ConcurrentCache<int, string>(3);
+
+ cache.TryAdd(1, "one");
+ cache.TryAdd(2, "two");
+ cache.TryAdd(3, "three");
+
+ // Mark entries 1 and 2 as visited
+ cache.GetValueOrDefault(1);
+ cache.GetValueOrDefault(2);
+
+ cache.TryAdd(4, "four");
+
+ Assert.IsNull(cache.GetValueOrDefault(3), "Unused entry 3 should be
evicted");
+ Assert.AreEqual("one", cache.GetValueOrDefault(1));
+ Assert.AreEqual("two", cache.GetValueOrDefault(2));
+ Assert.AreEqual("four", cache.GetValueOrDefault(4));
+ }
+
+ [Test]
+ public void TestEvictUnderLoad()
+ {
+ var cts = new CancellationTokenSource();
+ var keys = Enumerable.Range(1, 10).ToArray();
+ var cache = new ConcurrentCache<int, string>(keys.Length);
+ var gate = new ManualResetEventSlim();
+
+ foreach (var key in keys)
+ {
+ cache.TryAdd(key, string.Empty);
+ }
+
+ var visiterTask = Task.Run(() =>
+ {
+ while (!cts.Token.IsCancellationRequested)
+ {
+ foreach (var key in keys)
+ {
+ cache.GetValueOrDefault(key);
+ }
+
+ gate.Set();
+ }
+ });
+
+ using var cleaner = new DisposeAction(() =>
+ {
+ cts.Cancel();
+ TestUtils.WaitForCondition(() => visiterTask.IsCompleted);
+ });
+
+ gate.Wait();
+
+ var added = cache.TryAdd(-1, "new");
+
+ Assert.IsTrue(added, "Should be able to add new entry");
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index aa2f757b8ad..6fb4011e891 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -210,8 +210,8 @@ namespace Apache.Ignite.Tests
else
{
// Test that client skips those correctly.
- handshakeWriter.WriteBinaryHeader(3); // Features.
- handshakeWriter.Write([1, 2, 3]); // Random feature bits
+ handshakeWriter.WriteBinaryHeader(4); // Features.
+ handshakeWriter.Write([0, 0, 255, 255]); // Unknown feature
bits
handshakeWriter.Write(5); // Extensions.
for (int i = 0; i < 5; i++)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index 291af963a9a..0144d5d0c11 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -98,15 +98,15 @@ public class MetricsTests
using var client = await server.ConnectClientAsync();
- AssertMetric(MetricNames.BytesSent, 16);
+ AssertMetric(MetricNames.BytesSent, 19);
AssertMetric(MetricNames.BytesReceived, 91);
await client.Tables.GetTablesAsync();
- AssertMetric(MetricNames.BytesSent, 22);
+ AssertMetric(MetricNames.BytesSent, 25);
AssertMetric(MetricNames.BytesReceived, 100);
- AssertTaggedMetric(MetricNames.BytesSent, 22, server, client);
+ AssertTaggedMetric(MetricNames.BytesSent, 25, server, client);
AssertTaggedMetric(MetricNames.BytesReceived, 100, server, client);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
index 9235e09461b..276e701e1af 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
@@ -21,7 +21,6 @@ using System;
using System.Threading.Tasks;
using Common;
using Common.Compute;
-using Compute;
using Ignite.Compute;
using Ignite.Table;
using Internal.Proto;
@@ -30,40 +29,308 @@ using NUnit.Framework;
using static Common.Table.TestTables;
/// <summary>
-/// Tests partition awareness in real cluster.
+/// Tests partition awareness in a real cluster.
/// </summary>
public class PartitionAwarenessRealClusterTests : IgniteTestsBase
{
+ private const int Iterations = 50;
+
+ private string _tableName = string.Empty;
+
+ [TearDown]
+ public async Task DropTables()
+ {
+ if (!string.IsNullOrWhiteSpace(_tableName))
+ {
+ await Client.Sql.ExecuteScriptAsync($"DROP TABLE IF EXISTS
{_tableName}");
+ }
+ }
+
/// <summary>
- /// Uses <see cref="ComputeTests.NodeNameJob"/> to get the name of the
node that should be the primary for the given key,
+ /// Uses <see cref="JavaJobs.NodeNameJob"/> to get the name of the node
that should be the primary for the given key,
/// and compares to the actual node that received the request (using
IgniteProxy).
/// </summary>
/// <param name="withTx">Whether to use transactions.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous unit
test.</returns>
[Test]
public async Task TestPutRoutesRequestToPrimaryNode([Values(true, false)]
bool withTx)
+ {
+ await TestRequestRouting(
+ TableName,
+ id => new IgniteTuple { ["KEY"] = id },
+ async (client, view, tuple) =>
+ {
+ await using var tx = withTx ? await
client.Transactions.BeginAsync() : null;
+ await view.UpsertAsync(tx, tuple);
+ },
+ ClientOp.TupleUpsert);
+ }
+
+ [Test]
+ [TestCase("SELECT * FROM TBL1 WHERE KEY = ?")]
+ [TestCase("SELECT * FROM TBL1 WHERE 1 = 1 AND KEY = ?")]
+ [TestCase("SELECT * FROM TBL1 WHERE VAL IS NOT NULL AND KEY = ? AND 2 =
2")]
+ public async Task TestSqlSimpleKey(string query)
+ {
+ await TestRequestRouting(
+ TableName,
+ id => new IgniteTuple { ["KEY"] = id },
+ async (client, _, tuple) =>
+ {
+ await using var resultSet = await client.Sql.ExecuteAsync(
+ transaction: null,
+ statement: query,
+ tuple[KeyCol]);
+ },
+ ClientOp.SqlExec);
+ }
+
+ [Test]
+ [TestCase("SELECT * FROM TBL1 WHERE KEY >= ? AND KEY = ? AND KEY <= ?")]
+ [TestCase("SELECT * FROM TBL1 WHERE 1 = ? AND KEY = ? AND 2 = ?")]
+ public async Task TestSqlSimpleKeyWithExtraArgs(string query)
+ {
+ await TestRequestRouting(
+ TableName,
+ id => new IgniteTuple { ["KEY"] = id },
+ async (client, _, tuple) =>
+ {
+ await using var resultSet = await client.Sql.ExecuteAsync(
+ transaction: null,
+ statement: query,
+ tuple[KeyCol],
+ tuple[KeyCol],
+ tuple[KeyCol]);
+ },
+ ClientOp.SqlExec);
+ }
+
+ [Test]
+ public async Task TestSqlCompositeKey()
+ {
+ await CreateTable("(KEY BIGINT, VAL1 VARCHAR, VAL2 VARCHAR, PRIMARY
KEY (KEY, VAL2))");
+
+ await TestRequestRouting(
+ _tableName,
+ id => new IgniteTuple { ["KEY"] = id, ["VAL1"] = $"v1_{id}",
["VAL2"] = $"v2_{id}" },
+ async (client, _, tuple) =>
+ {
+ await using var resultSet = await client.Sql.ExecuteAsync(
+ transaction: null,
+ statement: $"SELECT * FROM {_tableName} WHERE KEY = ? AND
VAL2 = ?",
+ tuple["KEY"],
+ tuple["VAL2"]);
+ },
+ ClientOp.SqlExec);
+ }
+
+ [Test]
+ public async Task TestSqlCompositeKeyConstants()
+ {
+ await CreateTable("(KEY BIGINT, VAL1 VARCHAR, VAL2 VARCHAR, PRIMARY
KEY (KEY, VAL2))");
+
+ await TestRequestRouting(
+ _tableName,
+ id => new IgniteTuple { ["KEY"] = 42L, ["VAL1"] = $"v1_{id}",
["VAL2"] = $"v2_{id}" },
+ async (client, _, tuple) =>
+ {
+ await using var resultSet = await client.Sql.ExecuteAsync(
+ transaction: null,
+ statement: $"SELECT * FROM {_tableName} WHERE 1 = ? AND 2
= ? AND KEY = 42 AND VAL2 = ? AND 3 = ?",
+ 1,
+ 2,
+ tuple["VAL2"],
+ 3);
+ },
+ ClientOp.SqlExec);
+ }
+
+ [Test]
+ public async Task TestSqlColocateBy()
+ {
+ await CreateTable(
+ "(KEY1 INT, KEY2 VARCHAR, COL_STRING VARCHAR, COL_GUID UUID,
COL_BYTES VARBINARY, " +
+ "EXTRA1 VARCHAR, EXTRA2 VARCHAR, " +
+ "PRIMARY KEY (KEY2, KEY1, COL_STRING, COL_GUID, COL_BYTES)) " +
+ "COLOCATE BY (KEY1, COL_GUID, COL_STRING, COL_BYTES)");
+
+ await TestRequestRouting(
+ _tableName,
+ id => new IgniteTuple
+ {
+ ["KEY1"] = (int)id,
+ ["KEY2"] = $"key2_{id}",
+ ["COL_STRING"] = $"str_{id}",
+ ["COL_GUID"] = Guid.NewGuid(),
+ ["COL_BYTES"] = new[] { (byte)id, (byte)(id >> 8) },
+ ["EXTRA1"] = $"extra1_{id}",
+ ["EXTRA2"] = $"extra2_{id}"
+ },
+ async (client, _, tuple) =>
+ {
+ await using var resultSet = await client.Sql.ExecuteAsync(
+ transaction: null,
+ statement: $"SELECT * FROM {_tableName} WHERE KEY1 = ? AND
COL_STRING = ? AND COL_GUID = ? AND COL_BYTES = ?",
+ tuple["KEY1"],
+ tuple["COL_STRING"],
+ tuple["COL_GUID"],
+ tuple["COL_BYTES"]);
+ },
+ ClientOp.SqlExec);
+ }
+
+ [Test]
+ public async Task TestSqlColocateByAllConstants()
+ {
+ await CreateTable(
+ "(KEY1 INT, KEY2 VARCHAR, VAL INT, " +
+ "PRIMARY KEY (KEY1, KEY2)) " +
+ "COLOCATE BY (KEY1, KEY2)");
+
+ await TestRequestRouting(
+ _tableName,
+ id => new IgniteTuple
+ {
+ ["KEY1"] = 42,
+ ["KEY2"] = "constant_key",
+ ["VAL"] = (int)id
+ },
+ async (client, _, _) =>
+ {
+ await using var resultSet = await client.Sql.ExecuteAsync(
+ transaction: null,
+ statement: $"SELECT * FROM {_tableName} WHERE KEY1 = 42
AND KEY2 = 'constant_key'");
+ },
+ ClientOp.SqlExec);
+ }
+
+ [Test]
+ public async Task TestSqlColocateByMixedConstantsMiddle()
+ {
+ await CreateTable(
+ "(KEY1 INT, KEY2 VARCHAR, KEY3 INT, VAL VARCHAR, " +
+ "PRIMARY KEY (KEY1, KEY2, KEY3)) " +
+ "COLOCATE BY (KEY1, KEY2, KEY3)");
+
+ await TestRequestRouting(
+ _tableName,
+ id => new IgniteTuple
+ {
+ ["KEY1"] = (int)id,
+ ["KEY2"] = "fixed",
+ ["KEY3"] = (int)(id * 2),
+ ["VAL"] = $"val_{id}"
+ },
+ async (client, _, tuple) =>
+ {
+ await using var resultSet = await client.Sql.ExecuteAsync(
+ transaction: null,
+ statement: $"SELECT * FROM {_tableName} WHERE KEY1 = ? AND
KEY2 = 'fixed' AND KEY3 = ?",
+ tuple["KEY1"],
+ tuple["KEY3"]);
+ },
+ ClientOp.SqlExec);
+ }
+
+ [Test]
+ public async Task TestSqlSimpleKeyWithTransaction()
+ {
+ await TestRequestRouting(
+ TableName,
+ id => new IgniteTuple { ["KEY"] = id },
+ async (client, _, tuple) =>
+ {
+ await using var tx = await client.Transactions.BeginAsync();
+ await using var resultSet = await client.Sql.ExecuteAsync(
+ transaction: tx,
+ statement: "SELECT * FROM TBL1 WHERE KEY = ?",
+ tuple[KeyCol]);
+ await tx.CommitAsync();
+ },
+ ClientOp.SqlExec);
+ }
+
+ [Test]
+ public async Task TestSqlColocateByDifferentOrder()
+ {
+ await CreateTable(
+ "(KEY1 INT, KEY2 INT, KEY3 INT, VAL VARCHAR, " +
+ "PRIMARY KEY (KEY1, KEY2, KEY3)) " +
+ "COLOCATE BY (KEY1, KEY2)");
+
+ await TestRequestRouting(
+ _tableName,
+ id => new IgniteTuple
+ {
+ ["KEY1"] = (int)id,
+ ["KEY2"] = (int)(id * 2),
+ ["KEY3"] = (int)(id * 3),
+ ["VAL"] = $"val_{id}"
+ },
+ async (client, _, tuple) =>
+ {
+ await using var resultSet = await client.Sql.ExecuteAsync(
+ transaction: null,
+ statement: $"SELECT * FROM {_tableName} WHERE KEY2 = ? AND
KEY1 = ? AND KEY3 = ?",
+ tuple["KEY2"],
+ tuple["KEY1"],
+ tuple["KEY3"]);
+ },
+ ClientOp.SqlExec);
+ }
+
+ [Test]
+ public async Task TestSqlTableNameWithSpaces()
+ {
+ _tableName = "\"Table With Spaces\"";
+
+ await Client.Sql.ExecuteScriptAsync($"CREATE TABLE {_tableName} (KEY
BIGINT PRIMARY KEY, VAL VARCHAR)");
+
+ await TestRequestRouting(
+ _tableName,
+ id => new IgniteTuple { ["KEY"] = id, ["VAL"] = $"val_{id}" },
+ async (client, _, tuple) =>
+ {
+ await using var resultSet = await client.Sql.ExecuteAsync(
+ transaction: null,
+ statement: $"SELECT * FROM {_tableName} WHERE KEY = ?",
+ tuple["KEY"]);
+ },
+ ClientOp.SqlExec);
+ }
+
+ private static async Task<string>
GetPrimaryNodeNameWithJavaJob(IIgniteClient client, string tableName,
IIgniteTuple tuple)
+ {
+ var primaryNodeNameExec = await client.Compute.SubmitAsync(
+ JobTarget.Colocated(tableName, tuple),
+ JavaJobs.NodeNameJob,
+ null);
+
+ return await primaryNodeNameExec.GetResultAsync();
+ }
+
+ private async Task TestRequestRouting(
+ string tableName,
+ Func<long, IIgniteTuple> tupleFactory,
+ Func<IIgniteClient, IRecordView<IIgniteTuple>, IIgniteTuple, Task>
operation,
+ ClientOp expectedOp)
{
using var loggerFactory = new ConsoleLogger(LogLevel.Trace);
var proxies = GetProxies();
using var client = await IgniteClient.StartAsync(GetConfig(proxies,
loggerFactory));
- var recordView = (await
client.Tables.GetTableAsync(TableName))!.RecordBinaryView;
+ var recordView = (await
client.Tables.GetTableAsync(tableName))!.RecordBinaryView;
client.WaitForConnections(proxies.Count);
// Warm up.
- await recordView.GetAsync(null, new IgniteTuple { ["KEY"] = 1L });
+ await operation(client, recordView, tupleFactory(-1));
+ GetRequestTargetNodeName(proxies, expectedOp);
// Check.
- for (long key = 0; key < 50; key++)
+ for (long key = 0; key < Iterations; key++)
{
- var keyTuple = new IgniteTuple { ["KEY"] = key };
-
- var primaryNodeNameExec = await client.Compute.SubmitAsync(
- JobTarget.Colocated(TableName, keyTuple),
- JavaJobs.NodeNameJob,
- null);
-
- var primaryNodeName = await primaryNodeNameExec.GetResultAsync();
+ var tuple = tupleFactory(key);
+ var primaryNodeName = await GetPrimaryNodeNameWithJavaJob(client,
tableName, tuple);
if (primaryNodeName.EndsWith("_3", StringComparison.Ordinal) ||
primaryNodeName.EndsWith("_4", StringComparison.Ordinal))
{
@@ -71,22 +338,18 @@ public class PartitionAwarenessRealClusterTests :
IgniteTestsBase
continue;
}
- var tx = withTx ? await client.Transactions.BeginAsync() : null;
+ await operation(client, recordView, tuple);
- try
- {
- await recordView.UpsertAsync(tx, keyTuple);
- var requestTargetNodeName = GetRequestTargetNodeName(proxies,
ClientOp.TupleUpsert);
+ var requestTargetNodeName = GetRequestTargetNodeName(proxies,
expectedOp);
- Assert.AreEqual(primaryNodeName, requestTargetNodeName);
- }
- finally
- {
- if (tx != null)
- {
- await tx.RollbackAsync();
- }
- }
+ Assert.AreEqual(primaryNodeName, requestTargetNodeName);
}
}
+
+ private async Task CreateTable(string columns)
+ {
+ _tableName =
$"{nameof(PartitionAwarenessRealClusterTests)}_{TestContext.CurrentContext.Test.Name}";
+
+ await Client.Sql.ExecuteScriptAsync($"CREATE TABLE {_tableName}
{columns}");
+ }
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ProtocolBitmaskFeatureExtensionsTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ProtocolBitmaskFeatureExtensionsTest.cs
new file mode 100644
index 00000000000..e9905c916de
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ProtocolBitmaskFeatureExtensionsTest.cs
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Proto;
+
+using System;
+using Internal.Proto;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests for <see cref="ProtocolBitmaskFeatureExtensions"/>.
+/// </summary>
+public class ProtocolBitmaskFeatureExtensionsTest
+{
+ [Test]
+ public void TestToBytesWithSingleFeature()
+ {
+ var bytes = ProtocolBitmaskFeature.UserAttributes.ToBytes();
+
+ Assert.AreEqual(4, bytes.Length);
+ Assert.AreEqual(1, bytes[0]);
+ Assert.AreEqual(0, bytes[1]);
+ Assert.AreEqual(0, bytes[2]);
+ Assert.AreEqual(0, bytes[3]);
+ }
+
+ [Test]
+ public void TestToBytesWithMultipleFeatures()
+ {
+ var features = ProtocolBitmaskFeature.UserAttributes |
+ ProtocolBitmaskFeature.TableReqsUseQualifiedName |
+ ProtocolBitmaskFeature.TxDirectMapping;
+
+ var bytes = features.ToBytes();
+
+ Assert.AreEqual(4, bytes.Length);
+ Assert.AreEqual(7, bytes[0]); // 1 + 2 + 4 = 7
+ Assert.AreEqual(0, bytes[1]);
+ Assert.AreEqual(0, bytes[2]);
+ Assert.AreEqual(0, bytes[3]);
+ }
+
+ [Test]
+ public void TestToBytesWithHighBitFeatures()
+ {
+ var bytes =
ProtocolBitmaskFeature.SqlPartitionAwarenessTableName.ToBytes();
+
+ Assert.AreEqual(4, bytes.Length);
+ Assert.AreEqual(0, bytes[0]);
+ Assert.AreEqual(0, bytes[1]);
+ Assert.AreEqual(1, bytes[2]); // Bit 16 = 65536 = 0x010000
(little-endian)
+ Assert.AreEqual(0, bytes[3]);
+ }
+
+ [Test]
+ public void TestToBytesWithAllFeatures()
+ {
+ var features = ProtocolBitmaskFeature.UserAttributes |
+ ProtocolBitmaskFeature.TableReqsUseQualifiedName |
+ ProtocolBitmaskFeature.TxDirectMapping |
+ ProtocolBitmaskFeature.PlatformComputeJob |
+ ProtocolBitmaskFeature.PlatformComputeExecutor |
+ ProtocolBitmaskFeature.StreamerReceiverExecutionOptions
|
+ ProtocolBitmaskFeature.SqlPartitionAwareness |
+ ProtocolBitmaskFeature.SqlPartitionAwarenessTableName;
+
+ var bytes = features.ToBytes();
+
+ Assert.AreEqual(4, bytes.Length);
+
+ Assert.AreEqual(63, bytes[0]);
+ Assert.AreEqual(2, bytes[1]);
+ Assert.AreEqual(1, bytes[2]);
+ Assert.AreEqual(0, bytes[3]);
+ }
+
+ [Test]
+ public void TestFromBytesWithFourBytes()
+ {
+ byte[] bytes = [7, 0, 0, 0];
+ var features = ProtocolBitmaskFeatureExtensions.FromBytes(bytes);
+
+ var expected = ProtocolBitmaskFeature.UserAttributes |
+ ProtocolBitmaskFeature.TableReqsUseQualifiedName |
+ ProtocolBitmaskFeature.TxDirectMapping;
+
+ Assert.AreEqual(expected, features);
+ }
+
+ [Test]
+ public void TestFromBytesWithLessThanFourBytes()
+ {
+ byte[] bytes = [7];
+ var features = ProtocolBitmaskFeatureExtensions.FromBytes(bytes);
+
+ var expected = ProtocolBitmaskFeature.UserAttributes |
+ ProtocolBitmaskFeature.TableReqsUseQualifiedName |
+ ProtocolBitmaskFeature.TxDirectMapping;
+
+ Assert.AreEqual(expected, features);
+ }
+
+ [Test]
+ public void TestFromBytesWithTwoBytes()
+ {
+ byte[] bytes = [0, 2];
+ var features = ProtocolBitmaskFeatureExtensions.FromBytes(bytes);
+
+ Assert.AreEqual(ProtocolBitmaskFeature.SqlPartitionAwareness,
features);
+ }
+
+ [Test]
+ public void TestFromBytesWithThreeBytes()
+ {
+ byte[] bytes = [0, 0, 1];
+ var features = ProtocolBitmaskFeatureExtensions.FromBytes(bytes);
+
+ Assert.AreEqual(ProtocolBitmaskFeature.SqlPartitionAwarenessTableName,
features);
+ }
+
+ [Test]
+ public void TestFromBytesWithEmptyArray()
+ {
+ byte[] bytes = [];
+ var features = ProtocolBitmaskFeatureExtensions.FromBytes(bytes);
+
+ Assert.AreEqual((ProtocolBitmaskFeature)0, features);
+ }
+
+ [Test]
+ public void TestFromBytesWithMoreThanFourBytesThrows()
+ {
+ byte[] bytes = [1, 2, 3, 4, 5];
+
+ var ex = Assert.Throws<InvalidOperationException>(() =>
ProtocolBitmaskFeatureExtensions.FromBytes(bytes));
+ StringAssert.Contains("Invalid bitmask feature length: 5",
ex!.Message);
+ }
+
+ [Test]
+ public void TestRoundTripAllFeatures()
+ {
+ var original = ProtocolBitmaskFeature.UserAttributes |
+ ProtocolBitmaskFeature.TableReqsUseQualifiedName |
+ ProtocolBitmaskFeature.TxDirectMapping |
+ ProtocolBitmaskFeature.PlatformComputeJob |
+ ProtocolBitmaskFeature.PlatformComputeExecutor |
+ ProtocolBitmaskFeature.StreamerReceiverExecutionOptions
|
+ ProtocolBitmaskFeature.SqlPartitionAwareness |
+ ProtocolBitmaskFeature.SqlPartitionAwarenessTableName;
+
+ var bytes = original.ToBytes();
+ var restored = ProtocolBitmaskFeatureExtensions.FromBytes(bytes);
+
+ Assert.AreEqual(original, restored);
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
b/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
index eaf84a0d3bb..f9a050c9089 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
@@ -36,6 +36,11 @@ namespace Apache.Ignite
/// </summary>
public const int DefaultPort = 10800;
+ /// <summary>
+ /// Default SQL partition awareness metadata cache size.
+ /// </summary>
+ public const int DefaultSqlPartitionAwarenessMetadataCacheSize = 1024;
+
/// <summary>
/// Default socket timeout.
/// </summary>
@@ -102,6 +107,7 @@ namespace Apache.Ignite
SslStreamFactory = other.SslStreamFactory;
Authenticator = other.Authenticator;
ReResolveAddressesInterval = other.ReResolveAddressesInterval;
+ SqlPartitionAwarenessMetadataCacheSize =
other.SqlPartitionAwarenessMetadataCacheSize;
}
/// <summary>
@@ -212,5 +218,32 @@ namespace Apache.Ignite
/// See <see cref="BasicAuthenticator"/>.
/// </summary>
public IAuthenticator? Authenticator { get; set; }
+
+ /// <summary>
+ /// Gets or sets the size of cache to store partition awareness
metadata of SQL queries, in number of entries.
+ /// Default is <see
cref="DefaultSqlPartitionAwarenessMetadataCacheSize"/>.
+ /// <para />
+ /// Set to zero to disable SQL partition awareness.
+ /// </summary>
+ /// <remarks>
+ /// SQL partition awareness feature improves query performance by
directing queries to the specific server nodes that hold the
+ /// relevant data, minimizing network overhead. Ignite client builds
the metadata cache during the initial query execution and leverages
+ /// this cache to speed up subsequent queries.
+ /// <para />
+ /// In general, metadata is available for queries
+ /// which have equality predicate over all colocation columns, or
which insert the whole tuple. For example:
+ /// <code>
+ /// // Create reservations table colocated by floor_no.
+ /// CREATE TABLE RoomsReservations (room_no INT, floor_no INT,
PRIMARY_KEY (room_no, floor_no)) COLOCATE BY (floor_no);
+ ///
+ /// // Select reserved rooms by floor_no - allows computing a
partition and routing.
+ /// SELECT room_no FROM RoomsReservations WHERE floor_no = ?;
+ ///
+ /// // INSERT: parametrized by floor_no - allows computing a partition
and routing.
+ /// INSERT INTO RoomsReservations(room_no, floor_no) VALUES(?, ?);
+ /// </code>
+ /// </remarks>
+ /// <value>Cache size, in number of entries.</value>
+ public int SqlPartitionAwarenessMetadataCacheSize { get; set; } =
DefaultSqlPartitionAwarenessMetadataCacheSize;
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index c96e1ea601a..307ba622f63 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -208,13 +208,45 @@ namespace Apache.Ignite.Internal
/// <param name="expectNotifications">Whether to expect notifications
as a result of the operation.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Response data and socket.</returns>
- public async Task<(PooledBuffer Buffer, ClientSocket Socket)>
DoOutInOpAndGetSocketAsync(
+ public async Task<ClientResponse> DoOutInOpAndGetSocketAsync(
ClientOp clientOp,
Transaction? tx = null,
PooledArrayBuffer? request = null,
PreferredNode preferredNode = default,
IRetryPolicy? retryPolicyOverride = null,
bool expectNotifications = false,
+ CancellationToken cancellationToken = default) =>
+ await DoOutInOpAndGetSocketAsync(
+ clientOp,
+ tx,
+ request,
+ static (_, request0) => request0,
+ preferredNode,
+ retryPolicyOverride,
+ expectNotifications,
+ cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Performs an in-out operation and returns the socket along with the
response.
+ /// </summary>
+ /// <param name="clientOp">Operation.</param>
+ /// <param name="tx">Transaction.</param>
+ /// <param name="arg">Argument.</param>
+ /// <param name="requestWriter">Request writer.</param>
+ /// <param name="preferredNode">Preferred node.</param>
+ /// <param name="retryPolicyOverride">Retry policy.</param>
+ /// <param name="expectNotifications">Whether to expect notifications
as a result of the operation.</param>
+ /// <param name="cancellationToken">Cancellation token.</param>
+ /// <typeparam name="TArg">Arg type.</typeparam>
+ /// <returns>Response data and socket.</returns>
+ public async Task<ClientResponse> DoOutInOpAndGetSocketAsync<TArg>(
+ ClientOp clientOp,
+ Transaction? tx,
+ TArg arg,
+ Func<ClientSocket, TArg, PooledArrayBuffer?> requestWriter,
+ PreferredNode preferredNode = default,
+ IRetryPolicy? retryPolicyOverride = null,
+ bool expectNotifications = false,
CancellationToken cancellationToken = default)
{
if (tx != null)
@@ -225,19 +257,23 @@ namespace Apache.Ignite.Internal
}
// Use tx-specific socket without retry and failover.
+ using var request = requestWriter(tx.Socket, arg);
var buffer = await tx.Socket.DoOutInOpAsync(clientOp, request,
expectNotifications, cancellationToken).ConfigureAwait(false);
- return (buffer, tx.Socket);
+ return new ClientResponse(buffer, tx.Socket);
}
return await DoWithRetryAsync(
- (clientOp, request, expectNotifications, cancellationToken),
+ (clientOp, requestWriter, arg, expectNotifications,
cancellationToken),
static (_, arg) => arg.clientOp,
async static (socket, arg) =>
{
PooledBuffer res = await socket.DoOutInOpAsync(
- arg.clientOp, arg.request, arg.expectNotifications,
arg.cancellationToken).ConfigureAwait(false);
+ clientOp: arg.clientOp,
+ request: arg.requestWriter(socket, arg.arg),
+ expectNotifications: arg.expectNotifications,
+ cancellationToken:
arg.cancellationToken).ConfigureAwait(false);
- return (Buffer: res, Socket: socket);
+ return new ClientResponse(Buffer: res, Socket: socket);
},
preferredNode,
retryPolicyOverride)
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientResponse.cs
similarity index 52%
copy from
modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
copy to modules/platforms/dotnet/Apache.Ignite/Internal/ClientResponse.cs
index 45a6509cfab..fed8b3ad5d5 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientResponse.cs
@@ -15,43 +15,29 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Internal.Proto;
+namespace Apache.Ignite.Internal;
using System;
+using Buffers;
/// <summary>
-/// Protocol bitmask features.
+/// Client response. Contains the buffer with response data and the socket it
was read from.
/// </summary>
-[Flags]
-internal enum ProtocolBitmaskFeature
+/// <param name="Buffer">Response buffer.</param>
+/// <param name="Socket">Socket.</param>
+internal readonly record struct ClientResponse(PooledBuffer Buffer,
ClientSocket Socket) : IDisposable
{
- /// <summary>
- /// User attributes in handshake.
- /// </summary>
- UserAttributes = 1,
-
- /// <summary>
- /// Qualified name table requests.
- /// </summary>
- TableReqsUseQualifiedName = 2,
-
- /// <summary>
- /// Transaction direct mapping.
- /// </summary>
- TxDirectMapping = 4,
-
- /// <summary>
- /// Platform compute jobs (call non-Java jobs from client).
- /// </summary>
- PlatformComputeJob = 8,
-
- /// <summary>
- /// Platform compute executor (respond to server calls for job execution).
- /// </summary>
- PlatformComputeExecutor = 16,
+ /// <inheritdoc/>
+ public void Dispose() => Buffer.Dispose();
/// <summary>
- /// Streamer receiver execution options, including .NET receivers.
+ /// Deconstructs this instance.
/// </summary>
- StreamerReceiverExecutionOptions = 32
+ /// <param name="buffer">Buffer.</param>
+ /// <param name="socket">Socket.</param>
+ public void Deconstruct(out PooledBuffer buffer, out ClientSocket socket)
+ {
+ buffer = Buffer;
+ socket = Socket;
+ }
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index e4f92b2ab8c..4c4484f0667 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -51,10 +51,12 @@ namespace Apache.Ignite.Internal
ProtocolBitmaskFeature.TableReqsUseQualifiedName |
ProtocolBitmaskFeature.PlatformComputeJob |
ProtocolBitmaskFeature.PlatformComputeExecutor |
- ProtocolBitmaskFeature.StreamerReceiverExecutionOptions;
+ ProtocolBitmaskFeature.StreamerReceiverExecutionOptions |
+ ProtocolBitmaskFeature.SqlPartitionAwareness |
+ ProtocolBitmaskFeature.SqlPartitionAwarenessTableName;
- /** Features as byte array */
- private static readonly byte[] FeatureBytes = [(byte)Features];
+ /** Features as a byte array */
+ private static readonly byte[] FeatureBytes = Features.ToBytes();
/** Version 3.0.0. */
private static readonly ClientProtocolVersion Ver300 = new(3, 0, 0);
@@ -431,10 +433,7 @@ namespace Apache.Ignite.Internal
reader.Skip(); // Patch.
reader.Skip(); // Pre-release.
- ReadOnlySpan<byte> featureBits = reader.ReadBinary();
- ProtocolBitmaskFeature features = featureBits.Length > 0
- ? (ProtocolBitmaskFeature)featureBits[0] // Only one byte is
used for now.
- : 0;
+ ProtocolBitmaskFeature features =
ProtocolBitmaskFeatureExtensions.FromBytes(reader.ReadBinary());
int extensionMapSize = reader.ReadInt32();
for (int i = 0; i < extensionMapSize; i++)
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Common/ConcurrentCache.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Common/ConcurrentCache.cs
new file mode 100644
index 00000000000..9018ce71a51
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Common/ConcurrentCache.cs
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Common;
+
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+
+/// <summary>
+/// Concurrent cache with eviction based on capacity.
+/// </summary>
+/// <typeparam name="TKey">Key type.</typeparam>
+/// <typeparam name="TValue">Value type.</typeparam>
+internal sealed class ConcurrentCache<TKey, TValue>
+ where TKey : notnull
+{
+ private readonly int _capacity;
+ private readonly ConcurrentDictionary<TKey, Entry> _map;
+ private readonly IEnumerator<KeyValuePair<TKey, Entry>> _hand;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ConcurrentCache{TKey,
TValue}"/> class.
+ /// </summary>
+ /// <param name="capacity">Maximum capacity.</param>
+ public ConcurrentCache(int capacity)
+ {
+ _capacity = capacity;
+ _map = new ConcurrentDictionary<TKey, Entry>();
+ _hand = _map.GetEnumerator();
+ }
+
+ /// <summary>
+ /// Adds the specified key and value to the cache. Evicts old entries if
capacity is exceeded.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ /// <param name="value">Value.</param>
+ /// <returns>True if the entry was added, false if an entry with the same
key already exists.</returns>
+ public bool TryAdd(TKey key, TValue value)
+ {
+ var added = _map.TryAdd(key, new Entry(value));
+
+ if (added)
+ {
+ EvictIfNeeded();
+ }
+
+ return added;
+ }
+
+ /// <summary>
+ /// Gets the value associated with the specified key.
+ /// </summary>
+ /// <param name="key">Key.</param>
+ /// <returns>Value, or default(TValue) if not exists.</returns>
+ public TValue? GetValueOrDefault(TKey key)
+ {
+ if (!_map.TryGetValue(key, out var entry))
+ {
+ return default;
+ }
+
+ entry.Visited = true;
+ return entry.Value;
+ }
+
+ private void EvictIfNeeded()
+ {
+ if (_map.Count <= _capacity)
+ {
+ return;
+ }
+
+ lock (_hand)
+ {
+ // Avoid infinite loop, evict any entry after a full cycle.
+ int retries = _capacity;
+
+ while (_map.Count > _capacity)
+ {
+ // SIEVE-like eviction.
+ if (!_hand.MoveNext())
+ {
+ _hand.Reset();
+ continue;
+ }
+
+ var current = _hand.Current;
+
+ if (retries-- > 0 && current.Value.Visited)
+ {
+ current.Value.Visited = false;
+ continue;
+ }
+
+ _map.TryRemove(current.Key, out _);
+ }
+ }
+ }
+
+ private sealed record Entry(TValue Value)
+ {
+ private volatile bool _visited;
+
+ public bool Visited
+ {
+ get => _visited;
+ set => _visited = value;
+ }
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
index ebe28590030..f9ef18ddb39 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
@@ -43,8 +43,10 @@ namespace Apache.Ignite.Internal
{
Socket = socket;
- var sql = new Sql.Sql(socket);
- var tables = new Tables(socket, sql);
+ // TODO IGNITE-27846 Extract TableCache and avoid circular
dependency.
+ var tables = new Tables(socket);
+ var sql = new Sql.Sql(socket, tables);
+ tables.Sql = sql;
Tables = tables;
Transactions = new Transactions.Transactions(socket);
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
index f8df78970eb..6d9ce0a5c70 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
@@ -124,6 +124,15 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
return hash;
}
+ /// <summary>
+ /// Gets all hash codes in the order defined by <see
cref="IHashedColumnIndexProvider"/>.
+ /// The number of hash codes is defined by <see
cref="IHashedColumnIndexProvider.HashedColumnCount"/>.
+ /// </summary>
+ /// <returns>Span with hash codes.</returns>
+ public ReadOnlySpan<int> GetHashes() => _hashedColumnsPredicate == null
+ ? throw new InvalidOperationException("No hash codes were reserved
in the buffer.")
+ : GetHashSpan();
+
/// <summary>
/// Appends a null value.
/// </summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
index 45a6509cfab..e75d6b81100 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeature.cs
@@ -53,5 +53,15 @@ internal enum ProtocolBitmaskFeature
/// <summary>
/// Streamer receiver execution options, including .NET receivers.
/// </summary>
- StreamerReceiverExecutionOptions = 32
+ StreamerReceiverExecutionOptions = 32,
+
+ /// <summary>
+ /// Partition awareness for SQL requests.
+ /// </summary>
+ SqlPartitionAwareness = 1 << 9,
+
+ /// <summary>
+ /// Partition awareness for SQL requests with table name in metadata.
+ /// </summary>
+ SqlPartitionAwarenessTableName = 1 << 16
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeatureExtensions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeatureExtensions.cs
new file mode 100644
index 00000000000..d43551c96b5
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtocolBitmaskFeatureExtensions.cs
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Proto;
+
+using System;
+using System.Buffers.Binary;
+
+/// <summary>
+/// Extensions for <see cref="ProtocolBitmaskFeature"/>.
+/// </summary>
+internal static class ProtocolBitmaskFeatureExtensions
+{
+ /// <summary>
+ /// Gets the feature flags as bytes.
+ /// </summary>
+ /// <param name="feature">Flags.</param>
+ /// <returns>Bytes.</returns>
+ public static byte[] ToBytes(this ProtocolBitmaskFeature feature)
+ {
+ // BitSet.valueOf on the server is little-endian.
+ Span<byte> buffer = stackalloc byte[4];
+ BinaryPrimitives.WriteInt32LittleEndian(buffer, (int) feature);
+
+ return buffer.ToArray();
+ }
+
+ /// <summary>
+ /// Gets the feature flags from bytes.
+ /// </summary>
+ /// <param name="bytes">Bytes.</param>
+ /// <returns>Flags.</returns>
+ public static ProtocolBitmaskFeature FromBytes(ReadOnlySpan<byte> bytes)
+ {
+ if (bytes.Length > 4)
+ {
+ throw new InvalidOperationException("Invalid bitmask feature
length: " + bytes.Length);
+ }
+
+ if (bytes.Length < 4)
+ {
+ // Pad with zeros if less than 4 bytes.
+ Span<byte> buffer = stackalloc byte[4];
+ buffer.Clear();
+ bytes.CopyTo(buffer);
+
+ return (ProtocolBitmaskFeature)
BinaryPrimitives.ReadInt32LittleEndian(buffer);
+ }
+
+ // BitSet.valueOf on the server is little-endian.
+ return (ProtocolBitmaskFeature)
BinaryPrimitives.ReadInt32LittleEndian(bytes);
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs
index 798079578b1..7e425eca7a0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs
@@ -26,6 +26,7 @@ namespace Apache.Ignite.Internal.Sql
using Buffers;
using Common;
using Ignite.Sql;
+ using Ignite.Table;
using Proto;
using Proto.BinaryTuple;
using Proto.MsgPack;
@@ -61,21 +62,22 @@ namespace Apache.Ignite.Internal.Sql
/// <summary>
/// Initializes a new instance of the <see cref="ResultSet{T}"/> class.
/// </summary>
- /// <param name="socket">Socket.</param>
- /// <param name="buf">Buffer to read initial data from.</param>
+ /// <param name="response">Response.</param>
/// <param name="rowReaderFactory">Row reader factory.</param>
/// <param name="rowReaderArg">Row reader argument.</param>
+ /// <param name="partitionMetadataExpected">Whether partition metadata
is expected from the server.</param>
/// <param name="cancellationToken">Cancellation token.</param>
public ResultSet(
- ClientSocket socket,
- PooledBuffer buf,
+ ClientResponse response,
RowReaderFactory<T> rowReaderFactory,
object? rowReaderArg,
+ bool partitionMetadataExpected,
CancellationToken cancellationToken)
{
- _socket = socket;
+ _socket = response.Socket;
_cancellationToken = cancellationToken;
+ var buf = response.Buffer;
var reader = buf.GetReader();
// ReSharper disable once RedundantCast (required on .NET Core
3.1).
@@ -85,8 +87,10 @@ namespace Apache.Ignite.Internal.Sql
_hasMorePages = reader.ReadBoolean();
WasApplied = reader.ReadBoolean();
AffectedRows = reader.ReadInt64();
+ _metadata = ReadMeta(ref reader);
+ PartitionAwarenessMetadata = ReadPartitionAwarenessMetadata(
+ response.Socket.ConnectionContext, ref reader,
partitionMetadataExpected);
- _metadata = HasRowSet ? ReadMeta(ref reader) : null;
_rowReader = _metadata != null ? rowReaderFactory(_metadata) :
null;
_rowReaderArg = rowReaderArg;
@@ -134,6 +138,11 @@ namespace Apache.Ignite.Internal.Sql
/// </summary>
internal bool HasRows { get; }
+ /// <summary>
+ /// Gets the partition awareness metadata, if available.
+ /// </summary>
+ internal SqlPartitionAwarenessMetadata? PartitionAwarenessMetadata {
get; }
+
/// <inheritdoc/>
public async ValueTask<List<T>> ToListAsync() =>
await CollectAsync(
@@ -296,9 +305,13 @@ namespace Apache.Ignite.Internal.Sql
}
}
- private static ResultSetMetadata ReadMeta(ref MsgPackReader reader)
+ private static ResultSetMetadata? ReadMeta(ref MsgPackReader reader)
{
var size = reader.ReadInt32();
+ if (size == 0)
+ {
+ return null;
+ }
var columns = new ColumnMetadata[size];
@@ -328,6 +341,47 @@ namespace Apache.Ignite.Internal.Sql
return new ResultSetMetadata(columns);
}
+ private static SqlPartitionAwarenessMetadata?
ReadPartitionAwarenessMetadata(
+ ConnectionContext ctx, ref MsgPackReader reader, bool
partitionMetadataExpected)
+ {
+ if
(!ctx.ServerHasFeature(ProtocolBitmaskFeature.SqlPartitionAwareness) ||
!partitionMetadataExpected)
+ {
+ return null;
+ }
+
+ if (reader.TryReadNil())
+ {
+ return null;
+ }
+
+ var tableId = reader.ReadInt32();
+
+ var tableName =
ctx.ServerHasFeature(ProtocolBitmaskFeature.SqlPartitionAwarenessTableName)
+ ?
QualifiedName.FromNormalizedInternal(reader.ReadStringNullable(),
reader.ReadString())
+ : null;
+
+ var indexes = ReadIntArray(ref reader);
+ var hash = ReadIntArray(ref reader);
+
+ // Table name is required for caching. Return null if not
available.
+ return tableName == null
+ ? null
+ : new SqlPartitionAwarenessMetadata(tableId, tableName,
indexes, hash);
+
+ static int[] ReadIntArray(ref MsgPackReader reader)
+ {
+ var size = reader.ReadInt32();
+ var res = new int[size];
+
+ for (var i = 0; i < size; i++)
+ {
+ res[i] = reader.ReadInt32();
+ }
+
+ return res;
+ }
+ }
+
private T ReadRow(ref MsgPackReader reader)
{
var tupleReader = new BinaryTupleReader(reader.ReadBinary(),
_metadata!.Columns.Count);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index b89451b50e2..05837dfd27b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -33,6 +33,7 @@ namespace Apache.Ignite.Internal.Sql
using Proto;
using Proto.BinaryTuple;
using Proto.MsgPack;
+ using Table;
using Table.Serialization;
using Transactions;
@@ -49,13 +50,23 @@ namespace Apache.Ignite.Internal.Sql
/** Underlying connection. */
private readonly ClientFailoverSocket _socket;
+ private readonly Tables _tables;
+
+ /** Partition awareness mapping cache, keyed by (schema, query). */
+ private readonly ConcurrentCache<(string? Schema, string Query),
SqlPartitionMappingProvider>? _paMappingCache;
+
/// <summary>
/// Initializes a new instance of the <see cref="Sql"/> class.
/// </summary>
/// <param name="socket">Socket.</param>
- public Sql(ClientFailoverSocket socket)
+ /// <param name="tables">Tables.</param>
+ public Sql(ClientFailoverSocket socket, Tables tables)
{
_socket = socket;
+ _tables = tables;
+
+ var cacheSize =
socket.Configuration.Configuration.SqlPartitionAwarenessMetadataCacheSize;
+ _paMappingCache = cacheSize > 0 ? new(capacity: cacheSize) : null;
}
/// <inheritdoc/>
@@ -274,20 +285,58 @@ namespace Apache.Ignite.Internal.Sql
IgniteArgumentCheck.NotNull(statement);
cancellationToken.ThrowIfCancellationRequested();
- Transaction? tx = await
LazyTransaction.EnsureStartedAsync(transaction, _socket,
default).ConfigureAwait(false);
+
+ // Look up cached PA mapping to route the query to the preferred
node.
+ var paKey = (statement.Schema, statement.Query);
+ PreferredNode preferredNode = default;
+ bool requestPaMeta = _paMappingCache != null;
+
+ if (_paMappingCache?.GetValueOrDefault(paKey) is { }
mappingProvider)
+ {
+ requestPaMeta = false;
+ preferredNode = await
mappingProvider.GetPreferredNode(args).ConfigureAwait(false);
+ }
+
+ Transaction? tx = await
LazyTransaction.EnsureStartedAsync(transaction, _socket,
preferredNode).ConfigureAwait(false);
using var bufferWriter = ProtoCommon.GetMessageWriter();
- WriteStatement(bufferWriter, statement, args, tx, writeTx: true);
+ var writerArg = (Sql: this, bufferWriter, statement, args, tx,
requestPaMeta);
PooledBuffer? buf = null;
try
{
- (buf, var socket) = await _socket.DoOutInOpAndGetSocketAsync(
- ClientOp.SqlExec, tx, bufferWriter, cancellationToken:
cancellationToken).ConfigureAwait(false);
+ var response = await _socket.DoOutInOpAndGetSocketAsync(
+ clientOp: ClientOp.SqlExec,
+ tx: tx,
+ arg: writerArg,
+ requestWriter: static (socket, arg0) =>
+ {
+ var reqBuf = arg0.bufferWriter;
+ reqBuf.Reset();
- // ResultSet will dispose the pooled buffer.
- return new ResultSet<T>(socket, buf, rowReaderFactory,
rowReaderArg, cancellationToken);
+ var enablePartitionAwareness = arg0.requestPaMeta &&
+
socket.ConnectionContext.ServerHasFeature(ProtocolBitmaskFeature.SqlPartitionAwareness);
+
+ arg0.Sql.WriteStatement(reqBuf, arg0.statement,
arg0.args, arg0.tx, writeTx: true, enablePartitionAwareness);
+
+ return reqBuf;
+ },
+ preferredNode: preferredNode,
+ cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+
+ // ResultSet will dispose of the pooled buffer.
+ buf = response.Buffer;
+ var resultSet = new ResultSet<T>(response, rowReaderFactory,
rowReaderArg, requestPaMeta, cancellationToken);
+
+ if (resultSet.PartitionAwarenessMetadata is { } paMeta)
+ {
+ var table =
_tables.GetOrCreateCachedTableInternal(paMeta.TableId, paMeta.TableName);
+ _paMappingCache?.TryAdd(paKey, new
SqlPartitionMappingProvider(paMeta, table));
+ }
+
+ return resultSet;
}
catch (SqlException e)
{
@@ -422,7 +471,8 @@ namespace Apache.Ignite.Internal.Sql
SqlStatement statement,
ICollection<object?>? args,
Transaction? tx = null,
- bool writeTx = false)
+ bool writeTx = false,
+ bool enablePartitionAwareness = false)
{
var w = writer.MessageWriter;
@@ -430,6 +480,8 @@ namespace Apache.Ignite.Internal.Sql
w.WriteObjectCollectionWithCountAsBinaryTuple(args);
w.Write(_socket.ObservableTimestamp);
+
+ w.Write(enablePartitionAwareness);
}
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/SqlPartitionAwarenessMetadata.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/SqlPartitionAwarenessMetadata.cs
new file mode 100644
index 00000000000..69773c2f79e
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/SqlPartitionAwarenessMetadata.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Sql;
+
+using System;
+using Ignite.Table;
+using Proto.BinaryTuple;
+
+/// <summary>
+/// Partition awareness metadata returned by the server for SQL queries.
+/// <param name="TableId">Table id.</param>
+/// <param name="TableName">Table name.</param>
+/// <param name="Indexes">Colocation key element sources.
+/// <para/>
+/// The <see cref="Indexes"/> array describes how each element of a colocation
key should be interpreted during evaluation:
+/// <list type="bullet">
+/// <item>If <c>indexes[i] >= 0</c>, then the value at position <c>i</c> in
the colocation key should be taken
+/// from a dynamic parameter at index <c>indexes[i]</c>.</item>
+/// <item>If <c>indexes[i] < 0</c>, then the value at position <c>i</c> is
a constant literal whose precomputed
+/// hash is stored in the <see cref="Hash"/> array at index <c>-(indexes[i] +
1)</c>.</item>
+/// </list>
+/// </param>
+/// <param name="Hash">Precomputed hash values for constant colocation key
elements.</param>
+/// </summary>
+internal sealed record SqlPartitionAwarenessMetadata(
+ int TableId,
+ QualifiedName TableName,
+ ReadOnlyMemory<int> Indexes,
+ ReadOnlyMemory<int> Hash) : IHashedColumnIndexProvider
+{
+ /// <inheritdoc/>
+ public int HashedColumnCount => Indexes.Length;
+
+ /// <inheritdoc/>
+ public int HashedColumnOrder(int index) => index;
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/SqlPartitionMappingProvider.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/SqlPartitionMappingProvider.cs
new file mode 100644
index 00000000000..8d78e42a00c
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/SqlPartitionMappingProvider.cs
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Sql;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Proto;
+using Proto.BinaryTuple;
+using Table;
+
+/// <summary>
+/// SQL partition mapping provider.
+/// </summary>
+internal sealed record
SqlPartitionMappingProvider(SqlPartitionAwarenessMetadata Meta, Table Table)
+{
+ /// <summary>
+ /// Gets the preferred node for routing based on the given query
parameters.
+ /// </summary>
+ /// <param name="args">Query parameter values.</param>
+ /// <returns>Preferred node, or default if mapping cannot be computed
yet.</returns>
+ public async ValueTask<PreferredNode>
GetPreferredNode(ICollection<object?>? args)
+ {
+ // Both async calls return cached results if available, no need to
cache here.
+ var schema = await
Table.GetSchemaAsync(Table.SchemaVersionUnknown).ConfigureAwait(false);
+ var assignments = await
Table.GetPartitionAssignmentAsync().ConfigureAwait(false);
+
+ return GetPreferredNodeInternal(args, schema, assignments);
+ }
+
+ private PreferredNode GetPreferredNodeInternal(ICollection<object?>? args,
Schema schema, string?[] assignments)
+ {
+ var indexes = Meta.Indexes.Span;
+ var colocationColumns = schema.ColocationColumns;
+
+ if (colocationColumns.Length != indexes.Length)
+ {
+ return default;
+ }
+
+ IList<object?> args0 = args as IList<object?> ?? args?.ToArray() ?? [];
+
+ using var tupleBuilder = new BinaryTupleBuilder(
+ numElements: colocationColumns.Length,
+ hashedColumnsPredicate: Meta);
+
+ // First pass: append values for non-constant columns.
+ for (int i = 0; i < colocationColumns.Length; i++)
+ {
+ int idx = indexes[i];
+ if (idx < 0)
+ {
+ tupleBuilder.AppendNull(); // Skip and advance.
+ continue;
+ }
+
+ if (idx >= args0.Count)
+ {
+ // Not enough arguments to determine partition.
+ return default;
+ }
+
+ Column column = colocationColumns[i];
+ object? arg = args0[idx];
+
+ tupleBuilder.AppendObject(arg, column.Type, column.Scale,
column.Precision);
+ }
+
+ // Second pass: combine hashes for constant and written columns.
+ int colocationHash = 0;
+ var writtenHashes = tupleBuilder.GetHashes();
+
+ for (int i = 0; i < colocationColumns.Length; i++)
+ {
+ int idx = indexes[i];
+
+ var valueHash = idx < 0
+ ? Meta.Hash.Span[-(idx + 1)]
+ : writtenHashes[i];
+
+ colocationHash = HashUtils.Combine(colocationHash, valueHash);
+ }
+
+ int partition = Math.Abs(colocationHash % assignments.Length);
+ var node = assignments[partition];
+
+ return node == null ? default : PreferredNode.FromName(node);
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
index dc8a742b884..4d3922062e5 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Schema.cs
@@ -34,6 +34,7 @@ namespace Apache.Ignite.Internal.Table
/// <param name="Columns">Columns in schema order.</param>
/// <param name="KeyColumns">Key part columns.</param>
/// <param name="ValColumns">Val part columns.</param>
+ /// <param name="ColocationColumns">Colocation columns.</param>
/// <param name="ColumnsByName">Column name map.</param>
/// <param name="HashedColumnIndexProvider">Hashed column index
provider.</param>
/// <param name="KeyOnlyHashedColumnIndexProvider">Hashed column index
provider for key-only mode.</param>
@@ -45,6 +46,7 @@ namespace Apache.Ignite.Internal.Table
Column[] Columns,
Column[] KeyColumns,
Column[] ValColumns,
+ Column[] ColocationColumns,
IReadOnlyDictionary<string, Column> ColumnsByName,
IHashedColumnIndexProvider HashedColumnIndexProvider,
IHashedColumnIndexProvider KeyOnlyHashedColumnIndexProvider)
@@ -108,9 +110,24 @@ namespace Apache.Ignite.Internal.Table
Debug.Assert(columns.Length == 0 || colocationColumnCount > 0, "No
hashed columns");
var columnMap = new Dictionary<string, Column>(columns.Length);
+ var colocationColumns = colocationColumnCount > 0 ? new
Column[colocationColumnCount] : keyColumns;
+
foreach (var column in columns)
{
columnMap[IgniteTupleCommon.ParseColumnName(column.Name)] =
column;
+
+ if (column.ColocationIndex >= 0)
+ {
+ Debug.Assert(
+ column.ColocationIndex < colocationColumnCount,
+ $"Invalid colocation index: {column},
colocationColumnCount={colocationColumnCount},
schema={columns[column.ColocationIndex]}");
+
+ Debug.Assert(
+ colocationColumns[column.ColocationIndex] == null!,
+ $"Duplicate colocation index: {column},
{colocationColumns[column.ColocationIndex]}");
+
+ colocationColumns[column.ColocationIndex] = column;
+ }
}
return new Schema(
@@ -120,6 +137,7 @@ namespace Apache.Ignite.Internal.Table
columns,
keyColumns,
valColumns,
+ colocationColumns,
columnMap,
new HashedColumnIndexProvider(columns, colocationColumnCount),
new HashedColumnIndexProvider(keyColumns,
colocationColumnCount));
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
index 13777c6114f..33c8fd40293 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
@@ -34,9 +34,6 @@ namespace Apache.Ignite.Internal.Table
/** Socket. */
private readonly ClientFailoverSocket _socket;
- /** SQL. */
- private readonly Sql _sql;
-
/** Cached tables. Caching here is required to retain schema and
serializer caches in <see cref="Table"/>. */
private readonly ConcurrentDictionary<int, Table> _cachedTables =
new();
@@ -44,12 +41,12 @@ namespace Apache.Ignite.Internal.Table
/// Initializes a new instance of the <see cref="Tables"/> class.
/// </summary>
/// <param name="socket">Socket.</param>
- /// <param name="sql">Sql.</param>
- public Tables(ClientFailoverSocket socket, Sql sql)
- {
- _socket = socket;
- _sql = sql;
- }
+ public Tables(ClientFailoverSocket socket) => _socket = socket;
+
+ /// <summary>
+ /// Gets or sets the SQL API.
+ /// </summary>
+ public Sql Sql { get; set; } = null!;
/// <inheritdoc/>
public async Task<ITable?> GetTableAsync(string name) =>
@@ -85,12 +82,7 @@ namespace Apache.Ignite.Internal.Table
var id = r.ReadInt32();
var qualifiedName = UnpackQualifiedName(ref r,
packedAsQualified);
- var table = tables._cachedTables.GetOrAdd(
- id,
- static (int id0, (QualifiedName QualifiedName, Tables
Tables) arg) =>
- new Table(arg.QualifiedName, id0,
arg.Tables._socket, arg.Tables._sql),
- (qualifiedName, tables));
-
+ var table = tables.GetOrCreateCachedTableInternal(id,
qualifiedName);
res.Add(table);
}
@@ -152,17 +144,26 @@ namespace Apache.Ignite.Internal.Table
var tableId = r.ReadInt32();
var actualName = UnpackQualifiedName(ref r, op ==
ClientOp.TableGetQualified);
- return tables._cachedTables.GetOrAdd(
- tableId,
- static (int id, (QualifiedName ActualName, Tables Tables)
arg) =>
- new Table(arg.ActualName, id, arg.Tables._socket,
arg.Tables._sql),
- (actualName, tables));
+ return tables.GetOrCreateCachedTableInternal(tableId,
actualName);
}
static ClientOp Op(ClientSocket? socket) =>
UseQualifiedNames(socket) ? ClientOp.TableGetQualified :
ClientOp.TableGet;
}
+ /// <summary>
+ /// Gets or creates a cached table.
+ /// </summary>
+ /// <param name="id">Table id.</param>
+ /// <param name="qualifiedName">Table name.</param>
+ /// <returns>Table instance.</returns>
+ internal Table GetOrCreateCachedTableInternal(int id, QualifiedName
qualifiedName) =>
+ _cachedTables.GetOrAdd(
+ key: id,
+ valueFactory: static (int id0, (QualifiedName QualifiedName,
Tables Tables) arg) =>
+ new Table(arg.QualifiedName, id0, arg.Tables._socket,
arg.Tables.Sql),
+ factoryArgument: (qualifiedName, this));
+
private static QualifiedName UnpackQualifiedName(ref MsgPackReader r,
bool packedAsQualified)
{
if (packedAsQualified)
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Sql/IgniteDbConnectionStringBuilder.cs
b/modules/platforms/dotnet/Apache.Ignite/Sql/IgniteDbConnectionStringBuilder.cs
index 27cffd343fb..92ec5536134 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Sql/IgniteDbConnectionStringBuilder.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Sql/IgniteDbConnectionStringBuilder.cs
@@ -44,7 +44,8 @@ public sealed class IgniteDbConnectionStringBuilder :
DbConnectionStringBuilder
nameof(SslEnabled),
nameof(Username),
nameof(Password),
- nameof(ReResolveAddressesInterval)
+ nameof(ReResolveAddressesInterval),
+ nameof(SqlPartitionAwarenessMetadataCacheSize)
};
/// <summary>
@@ -160,6 +161,17 @@ public sealed class IgniteDbConnectionStringBuilder :
DbConnectionStringBuilder
set => this[nameof(ReResolveAddressesInterval)] = value.ToString();
}
+ /// <summary>
+ /// Gets or sets the SQL partition awareness metadata cache size. See <see
cref="IgniteClientConfiguration.SqlPartitionAwarenessMetadataCacheSize"/> for
more details.
+ /// </summary>
+ public int SqlPartitionAwarenessMetadataCacheSize
+ {
+ get => GetString(nameof(SqlPartitionAwarenessMetadataCacheSize)) is {
} s
+ ? int.Parse(s, CultureInfo.InvariantCulture)
+ :
IgniteClientConfiguration.DefaultSqlPartitionAwarenessMetadataCacheSize;
+ set => this[nameof(SqlPartitionAwarenessMetadataCacheSize)] =
value.ToString(CultureInfo.InvariantCulture);
+ }
+
/// <inheritdoc />
[AllowNull]
public override object this[string keyword]
@@ -194,7 +206,8 @@ public sealed class IgniteDbConnectionStringBuilder :
DbConnectionStringBuilder
Username = Username ?? string.Empty,
Password = Password ?? string.Empty
},
- ReResolveAddressesInterval = ReResolveAddressesInterval
+ ReResolveAddressesInterval = ReResolveAddressesInterval,
+ SqlPartitionAwarenessMetadataCacheSize =
SqlPartitionAwarenessMetadataCacheSize
};
}
diff --git
a/modules/sql-engine-api/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadata.java
b/modules/sql-engine-api/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadata.java
index bee2e9b3e92..0000227e46f 100644
---
a/modules/sql-engine-api/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadata.java
+++
b/modules/sql-engine-api/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadata.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.sql.engine.prepare.partitionawareness;
import java.util.Arrays;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.table.QualifiedName;
/**
* Partition awareness metadata.
@@ -49,6 +50,8 @@ public final class PartitionAwarenessMetadata {
private final DirectTxMode directTxMode;
+ private final QualifiedName tableName;
+
/**
* Constructor.
*
@@ -56,17 +59,20 @@ public final class PartitionAwarenessMetadata {
* @param indexes Mapping between positions in colocation key and dynamic
parameters.
* @param hash Array of computed hashes.
* @param directTxMode The level of support for direct transaction.
+ * @param tableName Table name.
*/
public PartitionAwarenessMetadata(
int tableId,
int[] indexes,
int[] hash,
- DirectTxMode directTxMode
+ DirectTxMode directTxMode,
+ QualifiedName tableName
) {
this.tableId = tableId;
this.indexes = indexes;
this.hash = hash;
this.directTxMode = directTxMode;
+ this.tableName = tableName;
}
/** Return table id. */
@@ -74,6 +80,11 @@ public final class PartitionAwarenessMetadata {
return tableId;
}
+ /** Return table name. */
+ public QualifiedName tableName() {
+ return tableName;
+ }
+
/** Returns the number of colocation key columns. */
public int size() {
return indexes.length;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataExtractor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataExtractor.java
index a0de72232fd..202dd5e85df 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataExtractor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataExtractor.java
@@ -46,6 +46,8 @@ import
org.apache.ignite.internal.sql.engine.util.RexUtils.FaultyContext;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.util.ColocationUtils;
+import org.apache.ignite.table.QualifiedName;
+import org.apache.ignite.table.QualifiedNameHelper;
import org.jetbrains.annotations.Nullable;
/**
@@ -162,7 +164,14 @@ public class PartitionAwarenessMetadataExtractor {
int[] hash = hashFields.toIntArray();
- return new PartitionAwarenessMetadata(igniteTable.id(), indexes, hash,
directTxMode);
+ return new PartitionAwarenessMetadata(igniteTable.id(), indexes, hash,
directTxMode, qualifiedName(optTable));
+ }
+
+ private static QualifiedName qualifiedName(RelOptTable optTable) {
+ List<String> nameParts = optTable.getQualifiedName();
+ assert nameParts.size() >= 2 : "Invalid qualified name: " + nameParts;
+
+ return QualifiedNameHelper.fromNormalized(nameParts.get(0),
nameParts.get(1));
}
private static @Nullable PartitionAwarenessMetadata
tryConvertPartitionPruningMetadata(
@@ -235,7 +244,7 @@ public class PartitionAwarenessMetadataExtractor {
int[] hash = hashFields.toIntArray();
- return new PartitionAwarenessMetadata(igniteTable.id(), indexes, hash,
directTxMode);
+ return new PartitionAwarenessMetadata(igniteTable.id(), indexes, hash,
directTxMode, qualifiedName(optTable));
}
private static long numberOfModifyAndSourceRels(RelWithSources
relationWithSources) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataTest.java
index 6e2c3fb67c4..4f387b1fe4b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/partitionawareness/PartitionAwarenessMetadataTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.ColocationUtils;
+import org.apache.ignite.table.QualifiedName;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -357,7 +358,7 @@ public class PartitionAwarenessMetadataTest extends
BaseIgniteAbstractTest {
for (int i = 0; i < toHash.length; ++i) {
hashes[i] = ColocationUtils.hash(toHash[i], NativeTypes.INT32);
}
- return new PartitionAwarenessMetadata(1, dynamicParams, hashes, mode);
+ return new PartitionAwarenessMetadata(1, dynamicParams, hashes, mode,
QualifiedName.parse("PUBLIC.TBL"));
}
private static PartitionAwarenessMetadata metaTrackingRequired(int[]
dynamicParams, int[] toHash) {