This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 799530bbc2 IGNITE-17969 .NET: Partition Awareness - support all key
types (#1260)
799530bbc2 is described below
commit 799530bbc28cd71c2debc78868ca52836f08d0d5
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Oct 31 08:32:52 2022 +0300
IGNITE-17969 .NET: Partition Awareness - support all key types (#1260)
---
.../handler/requests/table/ClientTableCommon.java | 21 +-
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 107 +++++++--
.../Apache.Ignite.Tests/PartitionAwarenessTests.cs | 42 +++-
.../Proto/BinaryTuple/BinaryTupleTests.cs | 4 +-
.../Proto/ColocationHashTests.cs | 149 ++++++++++++
.../Proto/BinaryTuple/BinaryTupleBuilder.cs | 257 ++++++++++++++++----
.../Proto/BinaryTuple/BinaryTupleCommon.cs | 11 +-
.../Proto/BinaryTuple/BinaryTupleReader.cs | 2 +-
.../Apache.Ignite/Internal/Proto/HashUtils.cs | 267 ++++++++++++++++++---
.../Apache.Ignite/Internal/Proto/ProtoCommon.cs | 6 +
.../runner/app/PlatformTestNodeRunner.java | 173 +++++++++++++
11 files changed, 932 insertions(+), 107 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index f40d5dbb2a..d769bca77c 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -43,9 +43,11 @@ import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.DecimalNativeType;
import org.apache.ignite.internal.schema.NativeType;
import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.NumberNativeType;
import org.apache.ignite.internal.schema.SchemaAware;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.TemporalNativeType;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.lang.IgniteException;
@@ -85,13 +87,14 @@ public class ClientTableCommon {
for (var colIdx = 0; colIdx < colCnt; colIdx++) {
var col = schema.column(colIdx);
- packer.packArrayHeader(6);
+ packer.packArrayHeader(7);
packer.packString(col.name());
packer.packInt(getClientDataType(col.type().spec()));
packer.packBoolean(schema.isKeyColumn(colIdx));
packer.packBoolean(col.nullable());
packer.packBoolean(colocationCols.contains(col));
packer.packInt(getDecimalScale(col.type()));
+ packer.packInt(getPrecision(col.type()));
}
}
@@ -523,4 +526,20 @@ public class ClientTableCommon {
private static int getDecimalScale(NativeType type) {
return type instanceof DecimalNativeType ? ((DecimalNativeType)
type).scale() : 0;
}
+
+ private static int getPrecision(NativeType type) {
+ if (type instanceof NumberNativeType) {
+ return ((NumberNativeType) type).precision();
+ }
+
+ if (type instanceof TemporalNativeType) {
+ return ((TemporalNativeType) type).precision();
+ }
+
+ if (type instanceof DecimalNativeType) {
+ return ((DecimalNativeType) type).precision();
+ }
+
+ return 0;
+ }
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 00a0041041..a70fa7d54f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -42,6 +42,16 @@ namespace Apache.Ignite.Tests
public const string ExistingTableName = "tbl1";
+ public const string CompositeKeyTableName = "tbl2";
+
+ public const string CustomColocationKeyTableName = "tbl3";
+
+ private static readonly Guid ExistingTableId = Guid.NewGuid();
+
+ private static readonly Guid CompositeKeyTableId = Guid.NewGuid();
+
+ private static readonly Guid CustomColocationKeyTableId =
Guid.NewGuid();
+
private readonly Socket _listener;
private readonly CancellationTokenSource _cts = new();
@@ -274,6 +284,72 @@ namespace Apache.Ignite.Tests
Send(handler, requestId, arrayBufferWriter);
}
+ private void GetSchemas(MessagePackReader reader, Socket handler, long
requestId)
+ {
+ var tableId = reader.ReadGuid();
+
+ using var arrayBufferWriter = new PooledArrayBufferWriter();
+ var writer = new MessagePackWriter(arrayBufferWriter);
+ writer.WriteMapHeader(1);
+ writer.Write(1); // Version.
+
+ if (tableId == ExistingTableId)
+ {
+ writer.WriteArrayHeader(1); // Columns.
+ writer.WriteArrayHeader(6); // Column props.
+ writer.Write("ID");
+ writer.Write((int)ClientDataType.Int32);
+ writer.Write(true); // Key.
+ writer.Write(false); // Nullable.
+ writer.Write(true); // Colocation.
+ writer.Write(0); // Scale.
+ }
+ else if (tableId == CompositeKeyTableId)
+ {
+ writer.WriteArrayHeader(2); // Columns.
+
+ writer.WriteArrayHeader(6); // Column props.
+ writer.Write("IdStr");
+ writer.Write((int)ClientDataType.String);
+ writer.Write(true); // Key.
+ writer.Write(false); // Nullable.
+ writer.Write(true); // Colocation.
+ writer.Write(0); // Scale.
+
+ writer.WriteArrayHeader(6); // Column props.
+ writer.Write("IdGuid");
+ writer.Write((int)ClientDataType.Uuid);
+ writer.Write(true); // Key.
+ writer.Write(false); // Nullable.
+ writer.Write(true); // Colocation.
+ writer.Write(0); // Scale.
+ }
+ else if (tableId == CustomColocationKeyTableId)
+ {
+ writer.WriteArrayHeader(2); // Columns.
+
+ writer.WriteArrayHeader(6); // Column props.
+ writer.Write("IdStr");
+ writer.Write((int)ClientDataType.String);
+ writer.Write(true); // Key.
+ writer.Write(false); // Nullable.
+ writer.Write(true); // Colocation.
+ writer.Write(0); // Scale.
+
+ writer.WriteArrayHeader(6); // Column props.
+ writer.Write("IdGuid");
+ writer.Write((int)ClientDataType.Uuid);
+ writer.Write(true); // Key.
+ writer.Write(false); // Nullable.
+ writer.Write(false); // Colocation.
+ writer.Write(0); // Scale.
+ }
+
+ writer.Flush();
+
+ Send(handler, requestId, arrayBufferWriter);
+ }
+
private void ListenLoop()
{
int requestCount = 0;
@@ -339,11 +415,19 @@ namespace Apache.Ignite.Tests
{
var tableName = reader.ReadString();
- if (tableName == ExistingTableName)
+ var tableId = tableName switch
+ {
+ ExistingTableName => ExistingTableId,
+ CompositeKeyTableName => CompositeKeyTableId,
+ CustomColocationKeyTableName =>
CustomColocationKeyTableId,
+ _ => default
+ };
+
+ if (tableId != default)
{
using var arrayBufferWriter = new
PooledArrayBufferWriter();
var writer = new
MessagePackWriter(arrayBufferWriter);
- writer.Write(Guid.Empty);
+ writer.Write(tableId);
writer.Flush();
Send(handler, requestId, arrayBufferWriter);
@@ -355,25 +439,8 @@ namespace Apache.Ignite.Tests
}
case ClientOp.SchemasGet:
- {
- using var arrayBufferWriter = new
PooledArrayBufferWriter();
- var writer = new
MessagePackWriter(arrayBufferWriter);
- writer.WriteMapHeader(1);
- writer.Write(1); // Version.
- writer.WriteArrayHeader(1); // Columns.
- writer.WriteArrayHeader(6); // Column props.
- writer.Write("ID");
- writer.Write((int)ClientDataType.Int32);
- writer.Write(true); // Key.
- writer.Write(false); // Nullable.
- writer.Write(true); // Colocation.
- writer.Write(0); // Scale.
-
- writer.Flush();
-
- Send(handler, requestId, arrayBufferWriter);
+ GetSchemas(reader, handler, requestId);
continue;
- }
case ClientOp.PartitionAssignmentGet:
{
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index 4845261d93..cd0b8f7772 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -26,10 +26,6 @@ using NUnit.Framework;
/// <summary>
/// Tests partition awareness.
-/// <para />
-/// TODO IGNITE-17969:
-/// * testCustomColocationKey
-/// * testCompositeKey.
/// </summary>
public class PartitionAwarenessTests
{
@@ -272,6 +268,41 @@ public class PartitionAwarenessTests
await AssertOpOnNode(() => kvView.RemoveAllAsync(null, pairs),
ClientOp.TupleDeleteAllExact, expectedNode);
}
+ [Test]
+ public async Task TestCompositeKey()
+ {
+ using var client = await GetClient();
+ var view = (await
client.Tables.GetTableAsync(FakeServer.CompositeKeyTableName))!.GetRecordView<CompositeKey>();
+
+ await view.UpsertAsync(null, new CompositeKey("1", Guid.Empty)); //
Warm up.
+
+ await Test("1", Guid.Empty, _server1);
+ await Test("1", Guid.Parse("b0000000-0000-0000-0000-000000000000"),
_server2);
+
+ await Test("a", Guid.Empty, _server2);
+ await Test("a", Guid.Parse("b0000000-0000-0000-0000-000000000000"),
_server1);
+
+ async Task Test(string idStr, Guid idGuid, FakeServer node) =>
+ await AssertOpOnNode(() => view.UpsertAsync(null, new
CompositeKey(idStr, idGuid)), ClientOp.TupleUpsert, node);
+ }
+
+ [Test]
+ public async Task TestCustomColocationKey()
+ {
+ using var client = await GetClient();
+ var view = (await
client.Tables.GetTableAsync(FakeServer.CustomColocationKeyTableName))!.GetRecordView<CompositeKey>();
+
+ // Warm up.
+ await view.UpsertAsync(null, new CompositeKey("1", Guid.Empty));
+
+ // Both columns are part of key, but only string column is colocation
key, so random Guid does not affect the hash.
+ await Test("1", Guid.NewGuid(), _server2);
+ await Test("a", Guid.NewGuid(), _server1);
+
+ async Task Test(string idStr, Guid idGuid, FakeServer node) =>
+ await AssertOpOnNode(() => view.UpsertAsync(null, new
CompositeKey(idStr, idGuid)), ClientOp.TupleUpsert, node);
+ }
+
private static async Task AssertOpOnNode(
Func<Task> action,
ClientOp op,
@@ -318,4 +349,7 @@ public class PartitionAwarenessTests
// Any server can be primary due to round-robin balancing in
ClientFailoverSocket.
return _server1.ClientOps.Count > 0 ? (_server1, _server2) :
(_server2, _server1);
}
+
+ // ReSharper disable NotAccessedPositionalProperty.Local
+ private record CompositeKey(string IdStr, Guid IdGuid);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
index 3a59daf316..082947806a 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
@@ -478,13 +478,15 @@ namespace Apache.Ignite.Tests.Proto.BinaryTuple
b.AppendDate(val);
b.AppendDate(LocalDate.MaxIsoValue);
b.AppendDate(LocalDate.MinIsoValue);
+ b.AppendDate(new LocalDate(1, 1, 1));
},
- 4);
+ 5);
Assert.AreEqual(default(LocalDate), reader.GetDate(0));
Assert.AreEqual(val, reader.GetDate(1));
Assert.AreEqual(LocalDate.MaxIsoValue, reader.GetDate(2));
Assert.AreEqual(LocalDate.MinIsoValue, reader.GetDate(3));
+ Assert.AreEqual(new LocalDate(1, 1, 1), reader.GetDate(4));
}
[Test]
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
new file mode 100644
index 0000000000..6b9780b6c2
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Proto;
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Numerics;
+using System.Threading.Tasks;
+using Internal.Proto.BinaryTuple;
+using NodaTime;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests that colocation hash calculation is consistent with server logic.
+/// </summary>
+public class ColocationHashTests : IgniteTestsBase
+{
+ private const string ColocationHashJob =
"org.apache.ignite.internal.runner.app.PlatformTestNodeRunner$ColocationHashJob";
+
+ private static readonly object[] TestCases =
+ {
+ sbyte.MinValue,
+ (sbyte)1,
+ (sbyte)-1,
+ sbyte.MaxValue,
+ short.MinValue,
+ (short)1,
+ (short)-1,
+ short.MaxValue,
+ int.MinValue,
+ 1,
+ 0,
+ -1,
+ int.MaxValue,
+ long.MinValue,
+ 1L,
+ -1L,
+ long.MaxValue,
+ float.MinValue,
+ -1.1f,
+ 1.1f,
+ float.Epsilon,
+ float.MaxValue,
+ double.MinValue,
+ -1.1d,
+ 1.1d,
+ double.Epsilon,
+ double.MaxValue,
+ decimal.MinValue,
+ -1.1m,
+ 1.1m,
+ 123.45678m,
+ decimal.MaxValue,
+ string.Empty,
+ "abc αβγ 🔥",
+ Guid.Empty,
+ Guid.NewGuid(),
+ BigInteger.One,
+ BigInteger.Zero,
+ BigInteger.MinusOne,
+ (BigInteger)int.MaxValue,
+ (BigInteger)int.MinValue,
+ (BigInteger)ulong.MaxValue,
+ BigInteger.Pow(123, 100),
+ new BitArray(1, false),
+ new BitArray(new byte[] {0, 5, 0}),
+ new BitArray(17, true),
+ new LocalDate(9876, 7, 30),
+ new LocalDate(2, 1, 1),
+ new LocalDate(1, 1, 1),
+ default(LocalDate),
+ new LocalTime(9, 8, 7),
+ LocalTime.Midnight,
+ LocalTime.Noon,
+ LocalDateTime.FromDateTime(DateTime.UtcNow).TimeOfDay,
+ default(LocalTime),
+ new LocalDateTime(year: 1, month: 1, day: 1, hour: 1, minute: 1,
second: 1, millisecond: 1),
+ new LocalDateTime(year: 2022, month: 10, day: 22, hour: 10, minute:
30, second: 55, millisecond: 123),
+ LocalDateTime.FromDateTime(DateTime.UtcNow),
+ default(LocalDateTime),
+ Instant.FromUnixTimeSeconds(0),
+ default(Instant)
+ };
+
+ [Test]
+ [TestCaseSource(nameof(TestCases))]
+ public async Task
TestSingleKeyColocationHashIsSameOnServerAndClient(object key) =>
+ await AssertClientAndServerHashesAreEqual(key);
+
+ [Test]
+ public async Task TestMultiKeyColocationHashIsSameOnServerAndClient()
+ {
+ for (var i = 0; i < TestCases.Length; i++)
+ {
+ await AssertClientAndServerHashesAreEqual(TestCases.Take(i +
1).ToArray());
+ await
AssertClientAndServerHashesAreEqual(TestCases.Skip(i).ToArray());
+ }
+ }
+
+ private static (byte[] Bytes, int Hash)
WriteAsBinaryTuple(IReadOnlyCollection<object> arr)
+ {
+ using var builder = new BinaryTupleBuilder(arr.Count * 3,
hashedColumnsPredicate: new TestIndexProvider());
+
+ foreach (var obj in arr)
+ {
+ builder.AppendObjectWithType(obj);
+ }
+
+ return (builder.Build().ToArray(), builder.Hash);
+ }
+
+ private async Task AssertClientAndServerHashesAreEqual(params object[]
keys)
+ {
+ var (bytes, hash) = WriteAsBinaryTuple(keys);
+
+ var serverHash = await GetServerHash(bytes, keys.Length);
+
+ Assert.AreEqual(serverHash, hash, string.Join(", ", keys));
+ }
+
+ private async Task<int> GetServerHash(byte[] bytes, int count)
+ {
+ var nodes = await Client.GetClusterNodesAsync();
+
+ return await Client.Compute.ExecuteAsync<int>(nodes,
ColocationHashJob, count, bytes);
+ }
+
+ private class TestIndexProvider : IHashedColumnIndexProvider
+ {
+ public bool IsHashedColumnIndex(int index) => index % 3 == 2;
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
index 8af5497f7b..4f1adc0161 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
@@ -124,6 +124,11 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
throw new InvalidOperationException("Appending a NULL value in
binary tuple builder with disabled NULLs");
}
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32((sbyte)0, _hash);
+ }
+
_hasNullValues = true;
int nullIndex = BinaryTupleCommon.NullOffset(_elementIndex);
@@ -139,6 +144,11 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// </summary>
public void AppendDefault()
{
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32((sbyte)0, _hash);
+ }
+
OnWrite();
}
@@ -148,6 +158,11 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendByte(sbyte value)
{
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(value, _hash);
+ }
+
if (value != 0)
{
PutByte(value);
@@ -162,15 +177,24 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendShort(short value)
{
- if (value >= sbyte.MinValue && value <= sbyte.MaxValue)
+ if (ShouldHash())
{
- AppendByte((sbyte)value);
+ _hash = HashUtils.Hash32(value, _hash);
}
- else
+
+ if (value != 0)
{
- PutShort(value);
- OnWrite();
+ if (value >= sbyte.MinValue && value <= sbyte.MaxValue)
+ {
+ PutByte((sbyte)value);
+ }
+ else
+ {
+ PutShort(value);
+ }
}
+
+ OnWrite();
}
/// <summary>
@@ -179,25 +203,25 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendInt(int value)
{
- // TODO IGNITE-17969 Partition Awareness - support all key types
- if (_hashedColumnsPredicate?.IsHashedColumnIndex(_elementIndex) ==
true)
+ if (ShouldHash())
{
_hash = HashUtils.Hash32(value, _hash);
}
- if (value >= sbyte.MinValue && value <= sbyte.MaxValue)
- {
- AppendByte((sbyte)value);
- return;
- }
-
- if (value >= short.MinValue && value <= short.MaxValue)
- {
- PutShort((short)value);
- }
- else
+ if (value != 0)
{
- PutInt(value);
+ if (value >= sbyte.MinValue && value <= sbyte.MaxValue)
+ {
+ PutByte((sbyte)value);
+ }
+ else if (value >= short.MinValue && value <= short.MaxValue)
+ {
+ PutShort((short)value);
+ }
+ else
+ {
+ PutInt(value);
+ }
}
OnWrite();
@@ -209,19 +233,29 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendLong(long value)
{
- if (value >= short.MinValue && value <= short.MaxValue)
+ if (ShouldHash())
{
- AppendShort((short)value);
- return;
+ _hash = HashUtils.Hash32(value, _hash);
}
- if (value >= int.MinValue && value <= int.MaxValue)
- {
- PutInt((int)value);
- }
- else
+ if (value != 0)
{
- PutLong(value);
+ if (value >= sbyte.MinValue && value <= sbyte.MaxValue)
+ {
+ PutByte((sbyte)value);
+ }
+ else if (value >= short.MinValue && value <= short.MaxValue)
+ {
+ PutShort((short)value);
+ }
+ else if (value >= int.MinValue && value <= int.MaxValue)
+ {
+ PutInt((int)value);
+ }
+ else
+ {
+ PutLong(value);
+ }
}
OnWrite();
@@ -233,6 +267,11 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendFloat(float value)
{
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(value, _hash);
+ }
+
if (value != 0.0F)
{
PutFloat(value);
@@ -247,14 +286,24 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendDouble(double value)
{
- // ReSharper disable once CompareOfFloatsByEqualityOperator
- if (value == (float)value)
+ if (ShouldHash())
{
- AppendFloat((float)value);
- return;
+ _hash = HashUtils.Hash32(value, _hash);
+ }
+
+ if (value != 0.0d)
+ {
+ // ReSharper disable once CompareOfFloatsByEqualityOperator
+ if (value == (float)value)
+ {
+ PutFloat((float)value);
+ }
+ else
+ {
+ PutDouble(value);
+ }
}
- PutDouble(value);
OnWrite();
}
@@ -291,6 +340,11 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendBytes(Span<byte> value)
{
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(value, _hash);
+ }
+
PutBytes(value);
OnWrite();
}
@@ -309,7 +363,22 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
{
if (value != default)
{
- UuidSerializer.Write(value, GetSpan(16));
+ var span = GetSpan(16);
+ UuidSerializer.Write(value, span);
+
+ if (ShouldHash())
+ {
+ _hash =
HashUtils.Hash32(BinaryPrimitives.ReadInt64LittleEndian(span[..8]), _hash);
+ _hash =
HashUtils.Hash32(BinaryPrimitives.ReadInt64LittleEndian(span[8..]), _hash);
+ }
+ }
+ else
+ {
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(0L, _hash);
+ _hash = HashUtils.Hash32(0L, _hash);
+ }
}
OnWrite();
@@ -327,7 +396,21 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
try
{
value.CopyTo(arr, 0);
- PutBytes(arr.AsSpan()[..size]);
+
+ // Trim zero bytes.
+ while (size > 0 && arr[size - 1] == 0)
+ {
+ size--;
+ }
+
+ var resBytes = arr.AsSpan()[..size];
+
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(resBytes, _hash);
+ }
+
+ PutBytes(resBytes);
OnWrite();
}
@@ -350,6 +433,13 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
PutDecimal(scale, unscaledValue, valueScale);
}
+ else
+ {
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(stackalloc byte[1] { 0 }, _hash);
+ }
+ }
OnWrite();
}
@@ -366,9 +456,21 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
var destination = GetSpan(size);
var success = value.TryWriteBytes(destination, out int
written, isBigEndian: true);
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(destination[..written], _hash);
+ }
+
Debug.Assert(success, "success");
Debug.Assert(written == size, "written == size");
}
+ else
+ {
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(stackalloc byte[1] { 0 }, _hash);
+ }
+ }
OnWrite();
}
@@ -379,7 +481,12 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendDate(LocalDate value)
{
- if (value != default)
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(value, _hash);
+ }
+
+ if (value != BinaryTupleCommon.DefaultDate)
{
PutDate(value);
}
@@ -393,6 +500,11 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendTime(LocalTime value)
{
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(value, _hash);
+ }
+
if (value != default)
{
PutTime(value);
@@ -407,7 +519,12 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendDateTime(LocalDateTime value)
{
- if (value != default)
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(value, _hash);
+ }
+
+ if (value != BinaryTupleCommon.DefaultDateTime)
{
PutDate(value.Date);
PutTime(value.TimeOfDay);
@@ -424,7 +541,21 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
{
if (value != default)
{
- PutTimestamp(value);
+ var (seconds, nanos) = PutTimestamp(value);
+
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(seconds, _hash);
+ _hash = HashUtils.Hash32((long)nanos, _hash);
+ }
+ }
+ else
+ {
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(0L, _hash);
+ _hash = HashUtils.Hash32(0L, _hash);
+ }
}
OnWrite();
@@ -436,6 +567,12 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendDuration(Duration value)
{
+ if (ShouldHash())
+ {
+ // Colocation keys can't include Duration.
+ throw new NotSupportedException("Duration hashing is not
supported.");
+ }
+
if (value != default)
{
PutDuration(value);
@@ -450,6 +587,12 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
/// <param name="value">Value.</param>
public void AppendPeriod(Period value)
{
+ if (ShouldHash())
+ {
+ // Colocation keys can't include Period.
+ throw new NotSupportedException("Period hashing is not
supported.");
+ }
+
if (value != Period.Zero)
{
PutPeriod(value);
@@ -603,10 +746,9 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
break;
case decimal dec:
- var (unscaled, scale) = DeconstructDecimal(dec);
+ var scale = GetDecimalScale(dec);
AppendTypeAndScale(ClientDataType.Decimal, scale);
- PutDecimal(scale, unscaled, scale);
- OnWrite();
+ AppendDecimal(dec, scale);
break;
case BigInteger bigInt:
@@ -741,6 +883,14 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
return (sign < 0 ? -unscaled : unscaled, scale);
}
+ private static int GetDecimalScale(decimal value)
+ {
+ Span<int> bits = stackalloc int[4];
+ decimal.GetBits(value, bits);
+
+ return (bits[3] & 0x00FF0000) >> 16;
+ }
+
private void PutDecimal(int scale, BigInteger unscaledValue, int
valueScale)
{
if (scale > valueScale)
@@ -756,6 +906,11 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
var destination = GetSpan(size);
var success = unscaledValue.TryWriteBytes(destination, out int
written, isBigEndian: true);
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(destination[..written], _hash);
+ }
+
Debug.Assert(success, "success");
Debug.Assert(written == size, "written == size");
}
@@ -778,18 +933,28 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
{
if (value.Length == 0)
{
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(Span<byte>.Empty, _hash);
+ }
+
return;
}
- var maxByteCount =
BinaryTupleCommon.StringEncoding.GetMaxByteCount(value.Length);
+ var maxByteCount =
ProtoCommon.StringEncoding.GetMaxByteCount(value.Length);
var span = _buffer.GetSpan(maxByteCount);
- var actualBytes = BinaryTupleCommon.StringEncoding.GetBytes(value,
span);
+ var actualBytes = ProtoCommon.StringEncoding.GetBytes(value, span);
+
+ if (ShouldHash())
+ {
+ _hash = HashUtils.Hash32(span[..actualBytes], _hash);
+ }
_buffer.Advance(actualBytes);
}
- private void PutTimestamp(Instant value)
+ private (long Seconds, int Nanos) PutTimestamp(Instant value)
{
// Logic taken from
//
https://github.com/nodatime/nodatime.serialization/blob/main/src/NodaTime.Serialization.Protobuf/NodaExtensions.cs#L69
@@ -805,6 +970,8 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
{
PutInt(nanos);
}
+
+ return (seconds, nanos);
}
private void PutDuration(Duration value)
@@ -948,5 +1115,7 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
return span;
}
+
+ private bool ShouldHash() =>
_hashedColumnsPredicate?.IsHashedColumnIndex(_elementIndex) == true;
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
index ed5704f343..23b45a4a11 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
@@ -18,7 +18,7 @@
namespace Apache.Ignite.Internal.Proto.BinaryTuple
{
using System.Diagnostics;
- using System.Text;
+ using NodaTime;
/// <summary>
/// Common binary tuple constants and utils.
@@ -41,9 +41,14 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
public const int NullmapFlag = 0b100;
/// <summary>
- /// UTF8 encoding without preamble (as opposed to <see
cref="Encoding.UTF8"/>).
+ /// Default value for Date elements (Jan 1st 1 BC).
/// </summary>
- public static readonly Encoding StringEncoding = new
UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
+ public static readonly LocalDate DefaultDate = new(year: 0, month: 1,
day: 1);
+
+ /// <summary>
+ /// Default value for DateTime elements (Jan 1st 1 BC, 00:00:00).
+ /// </summary>
+ public static readonly LocalDateTime DefaultDateTime = new(year: 0,
month: 1, day: 1, hour: 0, minute: 0);
/// <summary>
/// Calculates flags for a given size of variable-length area.
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
index 669e132932..5a9b228a0a 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
@@ -160,7 +160,7 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
public string GetString(int index) => Seek(index) switch
{
{ IsEmpty: true } => string.Empty,
- var s => BinaryTupleCommon.StringEncoding.GetString(s)
+ var s => ProtoCommon.StringEncoding.GetString(s)
};
/// <summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/HashUtils.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/HashUtils.cs
index 60bdec7eef..7e9a886e57 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/HashUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/HashUtils.cs
@@ -17,7 +17,10 @@
namespace Apache.Ignite.Internal.Proto;
+using System;
+using System.Buffers.Binary;
using System.Numerics;
+using NodaTime;
/// <summary>
/// Hash function based on MurmurHash3
@@ -30,65 +33,263 @@ internal static class HashUtils
private const ulong C1 = 0x87c37b91114253d5L;
private const ulong C2 = 0x4cf5ad432745937fL;
private const int R1 = 31;
-
- // TODO IGNITE-17969 Partition Awareness - support all key types
- /*private const int R2 = 27;
+ private const int R2 = 27;
private const int R3 = 33;
private const int M = 5;
private const int N1 = 0x52dce729;
- private const int N2 = 0x38495ab5;*/
+ private const int N2 = 0x38495ab5;
+
+ /// <summary>
+ /// Generates 32-bit hash.
+ /// </summary>
+ /// <param name="data">Input data.</param>
+ /// <param name="seed">Current hash.</param>
+ /// <returns>Resulting hash.</returns>
+ public static int Hash32(sbyte data, int seed) =>
Hash32Internal((ulong)(data & 0xffL), (ulong)seed, 1);
+
+ /// <summary>
+ /// Generates 32-bit hash.
+ /// </summary>
+ /// <param name="data">Input data.</param>
+ /// <param name="seed">Current hash.</param>
+ /// <returns>Resulting hash.</returns>
+ public static int Hash32(short data, int seed) =>
Hash32Internal((ulong)(data & 0xffffL), (ulong)seed, 2);
+
+ /// <summary>
+ /// Generates 32-bit hash.
+ /// </summary>
+ /// <param name="data">Input data.</param>
+ /// <param name="seed">Current hash.</param>
+ /// <returns>Resulting hash.</returns>
+ public static int Hash32(int data, int seed) =>
Hash32Internal((ulong)(data & 0xffffffffL), (ulong)seed, 4);
+
+ /// <summary>
+ /// Generates 32-bit hash.
+ /// </summary>
+ /// <param name="data">Input data.</param>
+ /// <param name="seed">Current hash.</param>
+ /// <returns>Resulting hash.</returns>
+ public static int Hash32(long data, int seed) =>
Hash32Internal((ulong)data, (ulong)seed, 8);
+
+ /// <summary>
+ /// Generates 32-bit hash.
+ /// </summary>
+ /// <param name="data">Input data.</param>
+ /// <param name="seed">Current hash.</param>
+ /// <returns>Resulting hash.</returns>
+ public static int Hash32(float data, int seed) =>
Hash32(BitConverter.SingleToInt32Bits(data), seed);
+
+ /// <summary>
+ /// Generates 32-bit hash.
+ /// </summary>
+ /// <param name="data">Input data.</param>
+ /// <param name="seed">Current hash.</param>
+ /// <returns>Resulting hash.</returns>
+ public static int Hash32(double data, int seed) =>
Hash32(BitConverter.DoubleToInt64Bits(data), seed);
/// <summary>
- /// Generates 32-bit hash from the integer value.
+ /// Generates 32-bit hash.
/// </summary>
/// <param name="data">Input data.</param>
/// <param name="seed">Current hash.</param>
/// <returns>Resulting hash.</returns>
- public static int Hash32(int data, int seed)
+ public static int Hash32(Span<byte> data, int seed) =>
Hash32Internal(data, (ulong)seed & 0xffffffffL);
+
+ /// <summary>
+ /// Generates 32-bit hash.
+ /// </summary>
+ /// <param name="data">Input data.</param>
+ /// <param name="seed">Current hash.</param>
+ /// <returns>Resulting hash.</returns>
+ public static int Hash32(LocalDate data, int seed) =>
Hash32((long)data.Day, Hash32((long)data.Month, Hash32((long)data.Year, seed)));
+
+ /// <summary>
+ /// Generates 32-bit hash.
+ /// </summary>
+ /// <param name="data">Input data.</param>
+ /// <param name="seed">Current hash.</param>
+ /// <returns>Resulting hash.</returns>
+ public static int Hash32(LocalTime data, int seed)
{
- ulong hash = Hash64(data, seed);
+ // TODO IGNITE-17992 Account for column precision.
+ return Hash32((long)data.NanosecondOfSecond, Hash32((long)data.Second,
Hash32((long)data.Minute, Hash32((long)data.Hour, seed))));
+ }
+
+ /// <summary>
+ /// Generates 32-bit hash.
+ /// </summary>
+ /// <param name="data">Input data.</param>
+ /// <param name="seed">Current hash.</param>
+ /// <returns>Resulting hash.</returns>
+ public static int Hash32(LocalDateTime data, int seed) =>
Hash32(data.TimeOfDay, Hash32(data.Date, seed));
- return (int)(hash ^ (hash >> 32));
+ private static int Hash32Internal(ulong data, ulong seed, byte byteCount)
+ {
+ var hash64 = Hash64Internal(data, seed, byteCount);
+
+ return (int)(hash64 ^ (hash64 >> 32));
+ }
+
+ private static ulong Hash64Internal(ulong data, ulong seed, byte byteCount)
+ {
+ unchecked
+ {
+ ulong h1 = seed;
+ ulong h2 = seed;
+
+ ulong k1 = 0;
+
+ k1 ^= data;
+ k1 *= C1;
+ k1 = BitOperations.RotateLeft(k1, R1);
+ k1 *= C2;
+ h1 ^= k1;
+
+ // finalization
+ h1 ^= byteCount;
+ h2 ^= byteCount;
+
+ h1 += h2;
+ h2 += h1;
+
+ h1 = Fmix64(h1);
+ h2 = Fmix64(h2);
+
+ return h1 + h2;
+ }
}
- private static ulong Hash64(int data, long seed)
+ private static int Hash32Internal(Span<byte> data, ulong seed)
{
- return HashInternal((uint)data, (ulong)seed);
+ var hash64 = Hash64Internal(data, seed);
+
+ return (int)(hash64 ^ (hash64 >> 32));
}
- private static ulong HashInternal(uint data, ulong seed)
+ private static ulong Hash64Internal(Span<byte> data, ulong seed)
{
- ulong h1 = seed;
- ulong h2 = seed;
+ unchecked
+ {
+ ulong h1 = seed;
+ ulong h2 = seed;
+ var length = data.Length;
+ int nblocks = length >> 4;
+
+ // body
+ for (int i = 0; i < nblocks; i++)
+ {
+ int idx = (i << 4);
+ ulong kk1 =
BinaryPrimitives.ReadUInt64LittleEndian(data.Slice(idx));
+ ulong kk2 =
BinaryPrimitives.ReadUInt64LittleEndian(data.Slice(idx + 8));
+
+ // mix functions for k1
+ kk1 *= C1;
+ kk1 = BitOperations.RotateLeft(kk1, R1);
+ kk1 *= C2;
+ h1 ^= kk1;
+ h1 = BitOperations.RotateLeft(h1, R2);
+ h1 += h2;
+ h1 = h1 * M + N1;
- ulong k1 = 0;
+ // mix functions for k2
+ kk2 *= C2;
+ kk2 = BitOperations.RotateLeft(kk2, R3);
+ kk2 *= C1;
+ h2 ^= kk2;
+ h2 = BitOperations.RotateLeft(h2, R1);
+ h2 += h1;
+ h2 = h2 * M + N2;
+ }
- k1 ^= data & 0xffffffffUL;
- k1 *= C1;
- k1 = BitOperations.RotateLeft(k1, R1);
- k1 *= C2;
- h1 ^= k1;
+ // tail
+ ulong k1 = 0;
+ ulong k2 = 0;
+ int index = nblocks << 4;
+ switch (length - index)
+ {
+ case 15:
+ k2 ^= ((ulong)data[index + 14] & 0xff) << 48;
+ goto case 14;
+ case 14:
+ k2 ^= ((ulong)data[index + 13] & 0xff) << 40;
+ goto case 13;
+ case 13:
+ k2 ^= ((ulong)data[index + 12] & 0xff) << 32;
+ goto case 12;
+ case 12:
+ k2 ^= ((ulong)data[index + 11] & 0xff) << 24;
+ goto case 11;
+ case 11:
+ k2 ^= ((ulong)data[index + 10] & 0xff) << 16;
+ goto case 10;
+ case 10:
+ k2 ^= ((ulong)data[index + 9] & 0xff) << 8;
+ goto case 9;
- // finalization
- h1 ^= 4;
- h2 ^= 4;
+ case 9:
+ k2 ^= (ulong)data[index + 8] & 0xff;
+ k2 *= C2;
+ k2 = BitOperations.RotateLeft(k2, R3);
+ k2 *= C1;
+ h2 ^= k2;
+ goto case 8;
- h1 += h2;
- h2 += h1;
+ case 8:
+ k1 ^= ((ulong)data[index + 7] & 0xff) << 56;
+ goto case 7;
+ case 7:
+ k1 ^= ((ulong)data[index + 6] & 0xff) << 48;
+ goto case 6;
+ case 6:
+ k1 ^= ((ulong)data[index + 5] & 0xff) << 40;
+ goto case 5;
+ case 5:
+ k1 ^= ((ulong)data[index + 4] & 0xff) << 32;
+ goto case 4;
+ case 4:
+ k1 ^= ((ulong)data[index + 3] & 0xff) << 24;
+ goto case 3;
+ case 3:
+ k1 ^= ((ulong)data[index + 2] & 0xff) << 16;
+ goto case 2;
+ case 2:
+ k1 ^= ((ulong)data[index + 1] & 0xff) << 8;
+ goto case 1;
- h1 = Fmix64(h1);
- h2 = Fmix64(h2);
+ case 1:
+ k1 ^= (ulong)data[index] & 0xff;
+ k1 *= C1;
+ k1 = BitOperations.RotateLeft(k1, R1);
+ k1 *= C2;
+ h1 ^= k1;
+ break;
+ }
- return h1 + h2;
+ // finalization
+ h1 ^= (ulong)length;
+ h2 ^= (ulong)length;
+
+ h1 += h2;
+ h2 += h1;
+
+ h1 = Fmix64(h1);
+ h2 = Fmix64(h2);
+
+ return h1 + h2;
+ }
}
private static ulong Fmix64(ulong hash)
{
- hash ^= (hash >> 33);
- hash *= 0xff51afd7ed558ccdL;
- hash ^= (hash >> 33);
- hash *= 0xc4ceb9fe1a85ec53L;
- hash ^= (hash >> 33);
- return hash;
+ unchecked
+ {
+ hash ^= hash >> 33;
+ hash *= 0xff51afd7ed558ccdL;
+ hash ^= hash >> 33;
+ hash *= 0xc4ceb9fe1a85ec53L;
+ hash ^= hash >> 33;
+
+ return hash;
+ }
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtoCommon.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtoCommon.cs
index 3a077f4c50..60c2b48d76 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtoCommon.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtoCommon.cs
@@ -17,6 +17,7 @@
namespace Apache.Ignite.Internal.Proto
{
+ using System.Text;
using Buffers;
/// <summary>
@@ -34,6 +35,11 @@ namespace Apache.Ignite.Internal.Proto
/// </summary>
public static readonly byte[] MagicBytes = { (byte)'I', (byte)'G',
(byte)'N', (byte)'I' };
+ /// <summary>
+ /// UTF8 encoding without preamble (as opposed to <see
cref="Encoding.UTF8"/>).
+ /// </summary>
+ public static readonly Encoding StringEncoding = new
UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
+
/// <summary>
/// Gets a new message writer.
/// </summary>
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 386c6f6975..02a7343884 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -22,6 +22,8 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -31,6 +33,17 @@ import org.apache.ignite.IgnitionManager;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.client.proto.ClientDataType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.row.Row;
import
org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter;
import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
@@ -38,6 +51,7 @@ import
org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.table.Tuple;
/**
* Helper class for non-Java platform tests (.NET, C++, Python, ...). Starts
nodes, populates tables and data for tests.
@@ -268,4 +282,163 @@ public class PlatformTestNodeRunner {
throw new RuntimeException("Test exception: " + args[0]);
}
}
+
+ /**
+ * Compute job that computes row colocation hash.
+ */
+ @SuppressWarnings({"unused"}) // Used by platform tests.
+ private static class ColocationHashJob implements ComputeJob<Integer> {
+ @Override
+ public Integer execute(JobExecutionContext context, Object... args) {
+ var columnCount = (int) args[0];
+ var buf = (byte[]) args[1];
+ var columns = new Column[columnCount];
+ var tuple = Tuple.create(columnCount);
+ var reader = new BinaryTupleReader(columnCount * 3, buf);
+
+ for (int i = 0; i < columnCount; i++) {
+ var type = reader.intValue(i * 3);
+ var scale = reader.intValue(i * 3 + 1);
+ var valIdx = i * 3 + 2;
+
+ String colName = "col" + i;
+
+ switch (type) {
+ case ClientDataType.INT8:
+ columns[i] = new Column(i, colName, NativeTypes.INT8,
false);
+ tuple.set(colName, reader.byteValue(valIdx));
+ break;
+
+ case ClientDataType.INT16:
+ columns[i] = new Column(i, colName, NativeTypes.INT16,
false);
+ tuple.set(colName, reader.shortValue(valIdx));
+ break;
+
+ case ClientDataType.INT32:
+ columns[i] = new Column(i, colName, NativeTypes.INT32,
false);
+ tuple.set(colName, reader.intValue(valIdx));
+ break;
+
+ case ClientDataType.INT64:
+ columns[i] = new Column(i, colName, NativeTypes.INT64,
false);
+ tuple.set(colName, reader.longValue(valIdx));
+ break;
+
+ case ClientDataType.FLOAT:
+ columns[i] = new Column(i, colName, NativeTypes.FLOAT,
false);
+ tuple.set(colName, reader.floatValue(valIdx));
+ break;
+
+ case ClientDataType.DOUBLE:
+ columns[i] = new Column(i, colName,
NativeTypes.DOUBLE, false);
+ tuple.set(colName, reader.doubleValue(valIdx));
+ break;
+
+ case ClientDataType.DECIMAL:
+ columns[i] = new Column(i, colName,
NativeTypes.decimalOf(100, scale), false);
+ tuple.set(colName, reader.decimalValue(valIdx, scale));
+ break;
+
+ case ClientDataType.STRING:
+ columns[i] = new Column(i, colName,
NativeTypes.STRING, false);
+ tuple.set(colName, reader.stringValue(valIdx));
+ break;
+
+ case ClientDataType.UUID:
+ columns[i] = new Column(i, colName, NativeTypes.UUID,
false);
+ tuple.set(colName, reader.uuidValue(valIdx));
+ break;
+
+ case ClientDataType.NUMBER:
+ columns[i] = new Column(i, colName,
NativeTypes.numberOf(255), false);
+ tuple.set(colName, reader.numberValue(valIdx));
+ break;
+
+ case ClientDataType.BITMASK:
+ columns[i] = new Column(i, colName,
NativeTypes.bitmaskOf(32), false);
+ tuple.set(colName, reader.bitmaskValue(valIdx));
+ break;
+
+ case ClientDataType.DATE:
+ columns[i] = new Column(i, colName, NativeTypes.DATE,
false);
+ tuple.set(colName, reader.dateValue(valIdx));
+ break;
+
+ case ClientDataType.TIME:
+ columns[i] = new Column(i, colName,
NativeTypes.time(9), false);
+ tuple.set(colName, reader.timeValue(valIdx));
+ break;
+
+ case ClientDataType.DATETIME:
+ columns[i] = new Column(i, colName,
NativeTypes.datetime(9), false);
+ tuple.set(colName, reader.dateTimeValue(valIdx));
+ break;
+
+ case ClientDataType.TIMESTAMP:
+ columns[i] = new Column(i, colName,
NativeTypes.timestamp(), false);
+ tuple.set(colName, reader.timestampValue(valIdx));
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unsupported type:
" + type);
+ }
+ }
+
+ var colocationColumns =
Arrays.stream(columns).map(Column::name).toArray(String[]::new);
+ var schema = new SchemaDescriptor(1, columns, colocationColumns,
new Column[0]);
+
+ var marsh = new TupleMarshallerImpl(new
TestSchemaRegistry(schema));
+
+ try {
+ Row row = marsh.marshal(tuple);
+
+ return row.colocationHash();
+ } catch (TupleMarshallerException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static class TestSchemaRegistry implements SchemaRegistry {
+ private final SchemaDescriptor schema;
+
+ private TestSchemaRegistry(SchemaDescriptor schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public SchemaDescriptor schema() {
+ return schema;
+ }
+
+ @Override
+ public SchemaDescriptor schema(int ver) throws SchemaRegistryException
{
+ return schema;
+ }
+
+ @Override
+ public SchemaDescriptor waitLatestSchema() {
+ return schema;
+ }
+
+ @Override
+ public int lastSchemaVersion() {
+ return 0;
+ }
+
+ @Override
+ public Row resolve(BinaryRow row, SchemaDescriptor desc) {
+ return null;
+ }
+
+ @Override
+ public Row resolve(BinaryRow row) {
+ return null;
+ }
+
+ @Override
+ public Collection<Row> resolve(Collection<BinaryRow> rows) {
+ return null;
+ }
+ }
}