This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 60acfd0922 IGNITE-19626 .NET: Propagate compute deployment units
(#2248)
60acfd0922 is described below
commit 60acfd0922e497f342e389837a74ef4f2affe178
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Jun 26 17:36:56 2023 +0300
IGNITE-19626 .NET: Propagate compute deployment units (#2248)
* Propagate deployment units to .NET Compute API
* Fix NPE in `DeploymentManagerImpl.detectLatestDeployedVersion`
* Fix compute job hang in `ComputeComponentImpl.processExecuteRequest`
---
.../org/apache/ignite/compute/IgniteCompute.java | 8 +-
.../apache/ignite/client/ClientComputeTest.java | 1 +
.../internal/deployunit/DeploymentManagerImpl.java | 16 ++-
.../internal/deployunit/IgniteDeployment.java | 2 +-
.../internal/compute/ComputeComponentImpl.java | 17 ++-
.../Apache.Ignite.Tests/BasicAuthenticatorTests.cs | 4 +-
.../Compute/ComputeClusterAwarenessTests.cs | 20 +++-
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 120 ++++++++++++++++----
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 58 ++++++++--
.../Linq/LinqSqlGenerationTests.cs | 2 +-
.../Apache.Ignite.Tests/PartitionAwarenessTests.cs | 11 +-
.../Proto/ColocationHashTests.cs | 10 +-
.../platforms/dotnet/Apache.Ignite.sln.DotSettings | 1 +
.../dotnet/Apache.Ignite/Compute/DeploymentUnit.cs | 33 ++++++
.../dotnet/Apache.Ignite/Compute/ICompute.cs | 121 ++++++++++++---------
.../Apache.Ignite/Internal/Compute/Compute.cs | 93 ++++++++++++++--
.../runner/app/client/ItThinClientComputeTest.java | 35 ++++++
17 files changed, 433 insertions(+), 119 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 038d8fe391..119c6231ea 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -36,7 +36,7 @@ public interface IgniteCompute {
* Executes a {@link ComputeJob} of the given class on a single node from
a set of candidate nodes.
*
* @param nodes Candidate nodes; the job will be executed on one of
them.
- * @param units Deployment units.
+ * @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
* @param args Arguments of the job.
* @param <R> Job result type
@@ -50,7 +50,7 @@ public interface IgniteCompute {
*
* @param tableName Name of the table whose key is used to determine the
node to execute the job on.
* @param key Key that identifies the node to execute the job on.
- * @param units Deployment units.
+ * @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
* @param args Arguments of the job.
* @param <R> Job result type.
@@ -65,7 +65,7 @@ public interface IgniteCompute {
* @param tableName Name of the table whose key is used to determine the
node to execute the job on.
* @param key Key that identifies the node to execute the job on.
* @param keyMapper Mapper used to map the key to a binary representation.
- * @param units Deployment units.
+ * @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
* @param args Arguments of the job.
* @param <R> Job result type.
@@ -84,7 +84,7 @@ public interface IgniteCompute {
* Executes a {@link ComputeJob} of the given class on all nodes in the
given node set.
*
* @param nodes Nodes to execute the job on.
- * @param units Deployment units.
+ * @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
* @param args Arguments of the job.
* @param <R> Job result type.
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index c884907379..fd6e783bb8 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -171,6 +171,7 @@ public class ClientComputeTest {
assertEquals("", getUnits.apply(List.of()));
assertEquals("u1:1.2.3", getUnits.apply(List.of(new
DeploymentUnit("u1", "1.2.3"))));
+ assertEquals("u:latest", getUnits.apply(List.of(new
DeploymentUnit("u", "LaTeSt"))));
assertEquals(
"u1:1.2.3,unit2:latest",
getUnits.apply(List.of(new DeploymentUnit("u1", "1.2.3"),
new DeploymentUnit("unit2", Version.LATEST))));
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 4f24d6a31f..3b4c0f28dd 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -339,11 +339,17 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
@Override
public CompletableFuture<Version> detectLatestDeployedVersion(String id) {
return clusterStatusesAsync(id)
- .thenApply(statuses -> statuses.versionStatuses().stream()
- .filter(e -> e.getStatus() == DEPLOYED)
- .reduce((first, second) -> second)
- .orElseThrow(() -> new
DeploymentUnitNotFoundException(id))
- .getVersion());
+ .thenApply(statuses -> {
+ if (statuses == null) {
+ throw new DeploymentUnitNotFoundException(id,
Version.LATEST);
+ }
+
+ return statuses.versionStatuses().stream()
+ .filter(e -> e.getStatus() == DEPLOYED)
+ .reduce((first, second) -> second)
+ .orElseThrow(() -> new
DeploymentUnitNotFoundException(id, Version.LATEST))
+ .getVersion();
+ });
}
@Override
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
index 2f3a7782d9..980427f091 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
@@ -86,7 +86,7 @@ public interface IgniteDeployment extends IgniteComponent {
* Lists all versions of the unit.
*
* @param id Unit identifier.
- * @return Future with the unit statuses.
+ * @return Future with the unit statuses. Result of the future can be null
when the specified unit does not exist.
*/
CompletableFuture<UnitStatuses> clusterStatusesAsync(String id);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 3aa401cfb4..5514632f4d 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -238,12 +238,19 @@ public class ComputeComponentImpl implements
ComputeComponent {
List<DeploymentUnit> units =
toDeploymentUnit(executeRequest.deploymentUnits());
mapClassLoaderExceptions(jobClassLoader(units),
executeRequest.jobClassName())
- .thenCompose(context -> {
- return
doExecuteLocally(jobClass(context.classLoader(),
executeRequest.jobClassName()), executeRequest.args())
- .whenComplete((r, e) ->
context.close())
- .handle((result, ex) ->
sendExecuteResponse(result, ex, senderConsistentId, correlationId));
+ .whenComplete((context, err) -> {
+ if (err != null) {
+ if (context != null) {
+ context.close();
}
- );
+
+ sendExecuteResponse(null, err, senderConsistentId,
correlationId);
+ }
+
+ doExecuteLocally(jobClass(context.classLoader(),
executeRequest.jobClassName()), executeRequest.args())
+ .whenComplete((r, e) -> context.close())
+ .handle((result, ex) ->
sendExecuteResponse(result, ex, senderConsistentId, correlationId));
+ });
} finally {
busyLock.leaveBusy();
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/BasicAuthenticatorTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/BasicAuthenticatorTests.cs
index 66eb808ba0..8a98732cd2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/BasicAuthenticatorTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/BasicAuthenticatorTests.cs
@@ -17,7 +17,9 @@
namespace Apache.Ignite.Tests;
+using System;
using System.Threading.Tasks;
+using Ignite.Compute;
using NUnit.Framework;
using Security;
@@ -101,7 +103,7 @@ public class BasicAuthenticatorTests : IgniteTestsBase
try
{
- await client.Compute.ExecuteAsync<object>(nodes, EnableAuthnJob,
enable ? 1 : 0);
+ await client.Compute.ExecuteAsync<object>(nodes,
Array.Empty<DeploymentUnit>(), EnableAuthnJob, enable ? 1 : 0);
}
catch (IgniteClientConnectionException)
{
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
index 43c796c822..ee3e78a8b1 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
@@ -17,9 +17,11 @@
namespace Apache.Ignite.Tests.Compute
{
+ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
+ using Ignite.Compute;
using Internal.Proto;
using NUnit.Framework;
@@ -45,8 +47,11 @@ namespace Apache.Ignite.Tests.Compute
// ReSharper disable once AccessToDisposedClosure
TestUtils.WaitForCondition(() => client.GetConnections().Count ==
3, 5000);
- var res2 = await client.Compute.ExecuteAsync<string>(nodes: new[]
{ server2.Node }, jobClassName: string.Empty);
- var res3 = await client.Compute.ExecuteAsync<string>(nodes: new[]
{ server3.Node }, jobClassName: string.Empty);
+ var res2 = await client.Compute.ExecuteAsync<string>(
+ new[] { server2.Node }, Array.Empty<DeploymentUnit>(),
jobClassName: string.Empty);
+
+ var res3 = await client.Compute.ExecuteAsync<string>(
+ new[] { server3.Node }, Array.Empty<DeploymentUnit>(),
jobClassName: string.Empty);
Assert.AreEqual("s2", res2);
Assert.AreEqual("s3", res3);
@@ -66,8 +71,11 @@ namespace Apache.Ignite.Tests.Compute
using var client = await server1.ConnectClientAsync();
- var res2 = await client.Compute.ExecuteAsync<string>(nodes: new[]
{ server2.Node }, jobClassName: string.Empty);
- var res3 = await client.Compute.ExecuteAsync<string>(nodes: new[]
{ server3.Node }, jobClassName: string.Empty);
+ var res2 = await client.Compute.ExecuteAsync<string>(
+ new[] { server2.Node }, Array.Empty<DeploymentUnit>(),
jobClassName: string.Empty);
+
+ var res3 = await client.Compute.ExecuteAsync<string>(
+ new[] { server3.Node }, Array.Empty<DeploymentUnit>(),
jobClassName: string.Empty);
Assert.AreEqual("s1", res2);
Assert.AreEqual("s1", res3);
@@ -101,7 +109,9 @@ namespace Apache.Ignite.Tests.Compute
for (int i = 0; i < 100; i++)
{
var node = i % 2 == 0 ? server1.Node : server2.Node;
- var res = await client.Compute.ExecuteAsync<string>(nodes:
new[] { node }, jobClassName: string.Empty);
+ var res = await client.Compute.ExecuteAsync<string>(
+ new[] { node }, Array.Empty<DeploymentUnit>(),
jobClassName: string.Empty);
+
nodeNames.Add(res);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 23fff5ffd6..5a610e3b27 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -55,6 +55,8 @@ namespace Apache.Ignite.Tests.Compute
private const string ExceptionJob = PlatformTestNodeRunner +
"$ExceptionJob";
+ private static readonly IList<DeploymentUnit> Units =
Array.Empty<DeploymentUnit>();
+
[Test]
public async Task TestGetClusterNodes()
{
@@ -76,8 +78,8 @@ namespace Apache.Ignite.Tests.Compute
[Test]
public async Task TestExecuteOnSpecificNode()
{
- var res1 = await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(0), NodeNameJob, "-", 11);
- var res2 = await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), NodeNameJob, ":", 22);
+ var res1 = await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(0), Units, NodeNameJob, "-", 11);
+ var res2 = await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), Units, NodeNameJob, ":", 22);
Assert.AreEqual(PlatformTestNodeRunner + "-_11", res1);
Assert.AreEqual(PlatformTestNodeRunner + "_2:_22", res2);
@@ -86,7 +88,7 @@ namespace Apache.Ignite.Tests.Compute
[Test]
public async Task TestExecuteOnRandomNode()
{
- var res = await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), NodeNameJob);
+ var res = await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), Units, NodeNameJob);
var expectedNodeNames = Enumerable.Range(1, 4)
.Select(x => x == 1 ? PlatformTestNodeRunner :
PlatformTestNodeRunner + "_" + x)
@@ -99,7 +101,7 @@ namespace Apache.Ignite.Tests.Compute
public void TestExecuteResultTypeMismatchThrowsInvalidCastException()
{
Assert.ThrowsAsync<InvalidCastException>(async () =>
- await Client.Compute.ExecuteAsync<Guid>(await
Client.GetClusterNodesAsync(), NodeNameJob));
+ await Client.Compute.ExecuteAsync<Guid>(await
Client.GetClusterNodesAsync(), Units, NodeNameJob));
}
[Test]
@@ -107,7 +109,7 @@ namespace Apache.Ignite.Tests.Compute
{
var nodes = await GetNodeAsync(0);
- IDictionary<IClusterNode, Task<string>> taskMap =
Client.Compute.BroadcastAsync<string>(nodes, NodeNameJob, "123");
+ IDictionary<IClusterNode, Task<string>> taskMap =
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
var res = await taskMap[nodes[0]];
Assert.AreEqual(1, taskMap.Count);
@@ -121,7 +123,7 @@ namespace Apache.Ignite.Tests.Compute
{
var nodes = await Client.GetClusterNodesAsync();
- IDictionary<IClusterNode, Task<string>> taskMap =
Client.Compute.BroadcastAsync<string>(nodes, NodeNameJob, "123");
+ IDictionary<IClusterNode, Task<string>> taskMap =
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
var res1 = await taskMap[nodes[0]];
var res2 = await taskMap[nodes[1]];
var res3 = await taskMap[nodes[2]];
@@ -138,7 +140,7 @@ namespace Apache.Ignite.Tests.Compute
[Test]
public async Task TestExecuteWithArgs()
{
- var res = await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), ConcatJob, 1.1, Guid.Empty, "3", null);
+ var res = await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), Units, ConcatJob, 1.1, Guid.Empty, "3", null);
Assert.AreEqual("1.1_00000000-0000-0000-0000-000000000000_3_null",
res);
}
@@ -146,7 +148,7 @@ namespace Apache.Ignite.Tests.Compute
[Test]
public async Task TestExecuteWithNullArgs()
{
- var res = await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), ConcatJob, args: null);
+ var res = await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), Units, ConcatJob, args: null);
Assert.IsNull(res);
}
@@ -155,7 +157,7 @@ namespace Apache.Ignite.Tests.Compute
public void TestJobErrorPropagatesToClientWithClassAndMessage()
{
var ex = Assert.ThrowsAsync<IgniteException>(async () =>
- await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), ErrorJob, "unused"));
+ await Client.Compute.ExecuteAsync<string>(await
Client.GetClusterNodesAsync(), Units, ErrorJob, "unused"));
StringAssert.Contains("Custom job error", ex!.Message);
@@ -175,7 +177,7 @@ namespace Apache.Ignite.Tests.Compute
var unknownNode = new ClusterNode("x", "y", new
IPEndPoint(IPAddress.Loopback, 0));
var ex = Assert.ThrowsAsync<IgniteException>(async () =>
- await Client.Compute.ExecuteAsync<string>(new[] { unknownNode
}, EchoJob, "unused"));
+ await Client.Compute.ExecuteAsync<string>(new[] { unknownNode
}, Units, EchoJob, "unused"));
StringAssert.Contains("Specified node is not present in the
cluster: y", ex!.Message);
}
@@ -223,7 +225,7 @@ namespace Apache.Ignite.Tests.Compute
{
var nodes = await Client.GetClusterNodesAsync();
var str = expectedStr ?? val.ToString()!.Replace("E+", "E");
- var res = await Client.Compute.ExecuteAsync<object>(nodes,
EchoJob, val, str);
+ var res = await Client.Compute.ExecuteAsync<object>(nodes,
Units, EchoJob, val, str);
Assert.AreEqual(val, res);
}
@@ -238,13 +240,13 @@ namespace Apache.Ignite.Tests.Compute
public async Task TestExecuteColocated(long key, string nodeName)
{
var keyTuple = new IgniteTuple { [KeyCol] = key };
- var resNodeName = await
Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple, NodeNameJob);
+ var resNodeName = await
Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple, Units,
NodeNameJob);
var keyPoco = new Poco { Key = key };
- var resNodeName2 = await
Client.Compute.ExecuteColocatedAsync<string, Poco>(TableName, keyPoco,
NodeNameJob);
+ var resNodeName2 = await
Client.Compute.ExecuteColocatedAsync<string, Poco>(TableName, keyPoco,
Units.Reverse(), NodeNameJob);
var keyPocoStruct = new PocoStruct(key, null);
- var resNodeName3 = await
Client.Compute.ExecuteColocatedAsync<string, PocoStruct>(TableName,
keyPocoStruct, NodeNameJob);
+ var resNodeName3 = await
Client.Compute.ExecuteColocatedAsync<string, PocoStruct>(TableName,
keyPocoStruct, Units, NodeNameJob);
var expectedNodeName = PlatformTestNodeRunner + nodeName;
Assert.AreEqual(expectedNodeName, resNodeName);
@@ -256,7 +258,7 @@ namespace Apache.Ignite.Tests.Compute
public void TestExecuteColocatedThrowsWhenTableDoesNotExist()
{
var ex = Assert.ThrowsAsync<IgniteClientException>(async () =>
- await
Client.Compute.ExecuteColocatedAsync<string>("unknownTable", new IgniteTuple(),
EchoJob));
+ await
Client.Compute.ExecuteColocatedAsync<string>("unknownTable", new IgniteTuple(),
Units, EchoJob));
Assert.AreEqual("Table 'unknownTable' does not exist.",
ex!.Message);
}
@@ -265,7 +267,7 @@ namespace Apache.Ignite.Tests.Compute
public void TestExecuteColocatedThrowsWhenKeyColumnIsMissing()
{
var ex = Assert.ThrowsAsync<IgniteException>(async () =>
- await Client.Compute.ExecuteColocatedAsync<string>(TableName,
new IgniteTuple(), EchoJob));
+ await Client.Compute.ExecuteColocatedAsync<string>(TableName,
new IgniteTuple(), Units, EchoJob));
StringAssert.Contains("Missed key column: KEY", ex!.Message);
}
@@ -275,25 +277,25 @@ namespace Apache.Ignite.Tests.Compute
{
// Create table and use it in ExecuteColocated.
var nodes = await GetNodeAsync(0);
- var tableName = await Client.Compute.ExecuteAsync<string>(nodes,
CreateTableJob, "drop_me");
+ var tableName = await Client.Compute.ExecuteAsync<string>(nodes,
Units, CreateTableJob, "drop_me");
try
{
var keyTuple = new IgniteTuple { [KeyCol] = 1L };
- var resNodeName = await
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob);
+ var resNodeName = await
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units,
NodeNameJob);
// Drop table and create a new one with a different ID, then
execute a computation again.
// This should update the cached table and complete the
computation successfully.
- await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob,
tableName);
- await Client.Compute.ExecuteAsync<string>(nodes,
CreateTableJob, tableName);
+ await Client.Compute.ExecuteAsync<string>(nodes, Units,
DropTableJob, tableName);
+ await Client.Compute.ExecuteAsync<string>(nodes, Units,
CreateTableJob, tableName);
- var resNodeName2 = await
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob);
+ var resNodeName2 = await
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units,
NodeNameJob);
Assert.AreEqual(resNodeName, resNodeName2);
}
finally
{
- await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob,
tableName);
+ await Client.Compute.ExecuteAsync<string>(nodes, Units,
DropTableJob, tableName);
}
}
@@ -301,7 +303,7 @@ namespace Apache.Ignite.Tests.Compute
public void
TestExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace()
{
var ex = Assert.ThrowsAsync<IgniteException>(async () =>
- await Client.Compute.ExecuteAsync<object>(await
GetNodeAsync(1), ExceptionJob, "foo-bar"));
+ await Client.Compute.ExecuteAsync<object>(await
GetNodeAsync(1), Units, ExceptionJob, "foo-bar"));
Assert.AreEqual("Test exception: foo-bar", ex!.Message);
Assert.IsNotNull(ex.InnerException);
@@ -313,6 +315,78 @@ namespace Apache.Ignite.Tests.Compute
str);
}
+ [Test]
+ public async Task TestDeploymentUnitsPropagation()
+ {
+ var units = new DeploymentUnit[]
+ {
+ new("unit-latest"),
+ new("unit1", "1.0.0")
+ };
+
+ using var server = new FakeServer();
+ using var client = await server.ConnectClientAsync();
+
+ var res = await client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), units, FakeServer.GetDetailsJob);
+ StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0",
res);
+
+ // Lazy enumerable.
+ var res2 = await client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), units.Reverse(), FakeServer.GetDetailsJob);
+ StringAssert.Contains("Units = unit1|1.0.0, unit-latest|latest",
res2);
+
+ // Colocated.
+ var keyTuple = new IgniteTuple { [KeyCol] = 1L };
+ var res3 = await client.Compute.ExecuteColocatedAsync<string>(
+ FakeServer.ExistingTableName, keyTuple, units,
FakeServer.GetDetailsJob);
+
+ StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0",
res3);
+ }
+
+ [Test]
+ public void TestExecuteOnUnknownUnitWithLatestVersionThrows()
+ {
+ var deploymentUnits = new DeploymentUnit[] { new("unit-latest") };
+
+ var ex = Assert.ThrowsAsync<IgniteException>(
+ async () => await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), deploymentUnits, NodeNameJob));
+
+ StringAssert.Contains("Deployment unit unit-latest:latest doesn’t
exist", ex!.Message);
+ }
+
+ [Test]
+ public void TestExecuteColocatedOnUnknownUnitWithLatestVersionThrows()
+ {
+ var keyTuple = new IgniteTuple { [KeyCol] = 1L };
+ var deploymentUnits = new DeploymentUnit[] { new("unit-latest") };
+
+ var ex = Assert.ThrowsAsync<IgniteException>(
+ async () => await
Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple,
deploymentUnits, NodeNameJob));
+
+ StringAssert.Contains("Deployment unit unit-latest:latest doesn’t
exist", ex!.Message);
+ }
+
+ [Test]
+ public void TestNullOrEmptyUnitNameThrows([Values(null, "")] string
unitName)
+ {
+ var deploymentUnits = new DeploymentUnit[] { new(unitName) };
+
+ var ex = Assert.ThrowsAsync<ArgumentException>(
+ async () => await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), deploymentUnits, NodeNameJob));
+
+ Assert.AreEqual("Deployment unit name can't be null or empty.",
ex!.Message);
+ }
+
+ [Test]
+ public void TestNullOrEmptyUnitVersionThrows([Values(null, "")] string
unitVersion)
+ {
+ var deploymentUnits = new DeploymentUnit[] { new("u", unitVersion)
};
+
+ var ex = Assert.ThrowsAsync<ArgumentException>(
+ async () => await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), deploymentUnits, NodeNameJob));
+
+ Assert.AreEqual("Deployment unit version can't be null or empty.",
ex!.Message);
+ }
+
private async Task<List<IClusterNode>> GetNodeAsync(int index) =>
(await Client.GetClusterNodesAsync()).OrderBy(n =>
n.Name).Skip(index).Take(1).ToList();
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 9954a160ea..3eba544acf 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -26,6 +26,7 @@ namespace Apache.Ignite.Tests
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
+ using Ignite.Compute;
using Ignite.Sql;
using Internal.Buffers;
using Internal.Network;
@@ -48,6 +49,8 @@ namespace Apache.Ignite.Tests
public const string CustomColocationKeyTableName = "tbl3";
+ public const string GetDetailsJob = "get-details";
+
private const int ExistingTableId = 1001;
private const int CompositeKeyTableId = 1002;
@@ -619,14 +622,8 @@ namespace Apache.Ignite.Tests
case ClientOp.ComputeExecute:
{
- using var arrayBufferWriter = new
PooledArrayBuffer();
- var writer = new MsgPackWriter(arrayBufferWriter);
-
- using var builder = new BinaryTupleBuilder(3);
- builder.AppendObjectWithType(Node.Name);
- writer.Write(builder.Build().Span);
-
- Send(handler, requestId, arrayBufferWriter);
+ using var pooledArrayBuffer =
ComputeExecute(reader);
+ Send(handler, requestId, pooledArrayBuffer);
continue;
}
@@ -644,8 +641,11 @@ namespace Apache.Ignite.Tests
continue;
case ClientOp.ComputeExecuteColocated:
- Send(handler, requestId, new byte[] { 1,
MessagePackCode.Nil }.AsMemory());
+ {
+ using var pooledArrayBuffer =
ComputeExecute(reader, colocated: true);
+ Send(handler, requestId, pooledArrayBuffer);
continue;
+ }
}
// Fake error message for any other op code.
@@ -664,6 +664,46 @@ namespace Apache.Ignite.Tests
}
}
+ private PooledArrayBuffer ComputeExecute(MsgPackReader reader, bool
colocated = false)
+ {
+ // Colocated: table id, schema version, key.
+ // Else: node name.
+ reader.Skip(colocated ? 4 : 1);
+
+ var unitsCount = reader.TryReadNil() ? 0 :
reader.ReadArrayHeader();
+ var units = new List<DeploymentUnit>(unitsCount);
+ for (int i = 0; i < unitsCount; i++)
+ {
+ units.Add(new DeploymentUnit(reader.ReadString(),
reader.ReadString()));
+ }
+
+ var jobClassName = reader.ReadString();
+
+ object? resObj = jobClassName == GetDetailsJob
+ ? new
+ {
+ NodeName = Node.Name,
+ Units = string.Join(", ", units.Select(u =>
$"{u.Name}|{u.Version}")),
+ jobClassName
+ }.ToString()
+ : Node.Name;
+
+ using var builder = new BinaryTupleBuilder(3);
+ builder.AppendObjectWithType(resObj);
+
+ var arrayBufferWriter = new PooledArrayBuffer();
+ var writer = new MsgPackWriter(arrayBufferWriter);
+
+ if (colocated)
+ {
+ writer.Write(1); // Latest schema.
+ }
+
+ writer.Write(builder.Build().Span);
+
+ return arrayBufferWriter;
+ }
+
internal record struct RequestContext(int RequestCount, ClientOp
OpCode, long RequestId);
[SuppressMessage("Design", "CA1032:Implement standard exception
constructors", Justification = "Tests.")]
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs
index 9a37d22972..cba4fa7a31 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs
@@ -358,7 +358,7 @@ public partial class LinqSqlGenerationTests
"where ((_T0.KEY IS NOT DISTINCT FROM ?) and (_T0.VAL IS DISTINCT
FROM ?))";
const string expectedToString =
- "IgniteQueryable`1[<>f__AnonymousType4`2[String, Int64]] { Query =
" +
+ "IgniteQueryable`1[<>f__AnonymousType5`2[String, Int64]] { Query =
" +
expectedQueryText +
", Parameters = [ 3, v-2 ] }";
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index 173f70b665..aa538784c4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -21,6 +21,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
+using Ignite.Compute;
using Ignite.Table;
using Internal.Proto;
using NUnit.Framework;
@@ -297,10 +298,10 @@ public class PartitionAwarenessTests
var key = new IgniteTuple { ["ID"] = keyId };
// Warm up.
- await
client.Compute.ExecuteColocatedAsync<object?>(FakeServer.ExistingTableName,
key, "job");
+ await
client.Compute.ExecuteColocatedAsync<object?>(FakeServer.ExistingTableName,
key, Array.Empty<DeploymentUnit>(), "job");
await AssertOpOnNode(
- () =>
client.Compute.ExecuteColocatedAsync<object?>(FakeServer.ExistingTableName,
key, "job"),
+ () =>
client.Compute.ExecuteColocatedAsync<object?>(FakeServer.ExistingTableName,
key, Array.Empty<DeploymentUnit>(), "job"),
ClientOp.ComputeExecuteColocated,
expectedNode);
}
@@ -314,10 +315,12 @@ public class PartitionAwarenessTests
var key = new SimpleKey(keyId);
// Warm up.
- await client.Compute.ExecuteColocatedAsync<object?,
SimpleKey>(FakeServer.ExistingTableName, key, "job");
+ await client.Compute.ExecuteColocatedAsync<object?, SimpleKey>(
+ FakeServer.ExistingTableName, key, Array.Empty<DeploymentUnit>(),
"job");
await AssertOpOnNode(
- () => client.Compute.ExecuteColocatedAsync<object?,
SimpleKey>(FakeServer.ExistingTableName, key, "job"),
+ () => client.Compute.ExecuteColocatedAsync<object?, SimpleKey>(
+ FakeServer.ExistingTableName, key,
Array.Empty<DeploymentUnit>(), "job"),
ClientOp.ComputeExecuteColocated,
expectedNode);
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
index 4b97a4cf1d..999e17581f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
@@ -25,6 +25,7 @@ using System.Linq;
using System.Numerics;
using System.Reflection;
using System.Threading.Tasks;
+using Ignite.Compute;
using Ignite.Sql;
using Ignite.Table;
using Internal.Buffers;
@@ -279,7 +280,14 @@ public class ColocationHashTests : IgniteTestsBase
{
var nodes = await Client.GetClusterNodesAsync();
- return await Client.Compute.ExecuteAsync<int>(nodes,
ColocationHashJob, count, bytes, timePrecision, timestampPrecision);
+ return await Client.Compute.ExecuteAsync<int>(
+ nodes,
+ Array.Empty<DeploymentUnit>(),
+ ColocationHashJob,
+ count,
+ bytes,
+ timePrecision,
+ timestampPrecision);
}
private record TestIndexProvider(Func<int, bool> Delegate) :
IHashedColumnIndexProvider
diff --git a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
index 23b1f4e508..54f37083a4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
+++ b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
@@ -24,6 +24,7 @@
<s:Boolean
x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean
x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean
x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
+ <s:Boolean
x:Key="/Default/UserDictionary/Words/=Colocated/@EntryIndexedValue">True</s:Boolean>
<s:Boolean
x:Key="/Default/UserDictionary/Words/=colocation/@EntryIndexedValue">True</s:Boolean>
<s:Boolean
x:Key="/Default/UserDictionary/Words/=failover/@EntryIndexedValue">True</s:Boolean>
<s:Boolean
x:Key="/Default/UserDictionary/Words/=Subquery/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/DeploymentUnit.cs
b/modules/platforms/dotnet/Apache.Ignite/Compute/DeploymentUnit.cs
new file mode 100644
index 0000000000..3597976e7e
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/DeploymentUnit.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Compute;
+
+/// <summary>
+/// Deployment unit identifier.
+/// </summary>
+/// <param name="Name">Unit name.</param>
+/// <param name="Version">Unit version. Defaults to <see
cref="LatestVersion"/>.</param>
+public sealed record DeploymentUnit(
+ string Name,
+ string Version = DeploymentUnit.LatestVersion)
+{
+ /// <summary>
+ /// Latest version.
+ /// </summary>
+ public const string LatestVersion = "latest";
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
index 30ae11d386..5175b475cf 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
@@ -15,60 +15,81 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Compute
-{
- using System.Collections.Generic;
- using System.Threading.Tasks;
- using Network;
- using Table;
+namespace Apache.Ignite.Compute;
+
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Network;
+using Table;
+/// <summary>
+/// Ignite Compute API provides distributed job execution functionality.
+/// </summary>
+public interface ICompute
+{
/// <summary>
- /// Ignite Compute API provides distributed job execution functionality.
+ /// Executes a compute job represented by the given class on one of the
specified nodes.
/// </summary>
- public interface ICompute
- {
- /// <summary>
- /// Executes a compute job represented by the given class on one of
the specified nodes.
- /// </summary>
- /// <param name="nodes">Nodes to use for the job execution.</param>
- /// <param name="jobClassName">Java class name of the job to
execute.</param>
- /// <param name="args">Job arguments.</param>
- /// <typeparam name="T">Job result type.</typeparam>
- /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task<T> ExecuteAsync<T>(IEnumerable<IClusterNode> nodes, string
jobClassName, params object?[]? args);
+ /// <param name="nodes">Nodes to use for the job execution.</param>
+ /// <param name="units">Deployment units. Can be empty.</param>
+ /// <param name="jobClassName">Java class name of the job to
execute.</param>
+ /// <param name="args">Job arguments.</param>
+ /// <typeparam name="T">Job result type.</typeparam>
+ /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
+ Task<T> ExecuteAsync<T>(
+ IEnumerable<IClusterNode> nodes,
+ IEnumerable<DeploymentUnit> units,
+ string jobClassName,
+ params object?[]? args);
- /// <summary>
- /// Executes a job represented by the given class on one node where
the given key is located.
- /// </summary>
- /// <param name="tableName">Name of the table to be used with
<paramref name="key"/> to determine target node.</param>
- /// <param name="key">Table key to be used to determine the target
node for job execution.</param>
- /// <param name="jobClassName">Java class name of the job to
execute.</param>
- /// <param name="args">Job arguments.</param>
- /// <typeparam name="T">Job result type.</typeparam>
- /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task<T> ExecuteColocatedAsync<T>(string tableName, IIgniteTuple key,
string jobClassName, params object?[]? args);
+ /// <summary>
+ /// Executes a job represented by the given class on one node where the
given key is located.
+ /// </summary>
+ /// <param name="tableName">Name of the table to be used with <paramref
name="key"/> to determine target node.</param>
+ /// <param name="key">Table key to be used to determine the target node
for job execution.</param>
+ /// <param name="units">Deployment units. Can be empty.</param>
+ /// <param name="jobClassName">Java class name of the job to
execute.</param>
+ /// <param name="args">Job arguments.</param>
+ /// <typeparam name="T">Job result type.</typeparam>
+ /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
+ Task<T> ExecuteColocatedAsync<T>(
+ string tableName,
+ IIgniteTuple key,
+ IEnumerable<DeploymentUnit> units,
+ string jobClassName,
+ params object?[]? args);
- /// <summary>
- /// Executes a job represented by the given class on one node where
the given key is located.
- /// </summary>
- /// <param name="tableName">Name of the table to be used with
<paramref name="key"/> to determine target node.</param>
- /// <param name="key">Table key to be used to determine the target
node for job execution.</param>
- /// <param name="jobClassName">Java class name of the job to
execute.</param>
- /// <param name="args">Job arguments.</param>
- /// <typeparam name="T">Job result type.</typeparam>
- /// <typeparam name="TKey">Key type.</typeparam>
- /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task<T> ExecuteColocatedAsync<T, TKey>(string tableName, TKey key,
string jobClassName, params object?[]? args)
- where TKey : notnull;
+ /// <summary>
+ /// Executes a job represented by the given class on one node where the
given key is located.
+ /// </summary>
+ /// <param name="tableName">Name of the table to be used with <paramref
name="key"/> to determine target node.</param>
+ /// <param name="key">Table key to be used to determine the target node
for job execution.</param>
+ /// <param name="units">Deployment units. Can be empty.</param>
+ /// <param name="jobClassName">Java class name of the job to
execute.</param>
+ /// <param name="args">Job arguments.</param>
+ /// <typeparam name="T">Job result type.</typeparam>
+ /// <typeparam name="TKey">Key type.</typeparam>
+ /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
+ Task<T> ExecuteColocatedAsync<T, TKey>(
+ string tableName,
+ TKey key,
+ IEnumerable<DeploymentUnit> units,
+ string jobClassName,
+ params object?[]? args)
+ where TKey : notnull;
- /// <summary>
- /// Executes a compute job represented by the given class on all of
the specified nodes.
- /// </summary>
- /// <param name="nodes">Nodes to use for the job execution.</param>
- /// <param name="jobClassName">Java class name of the job to
execute.</param>
- /// <param name="args">Job arguments.</param>
- /// <typeparam name="T">Job result type.</typeparam>
- /// <returns>A map of <see cref="Task"/> representing the asynchronous
operation for every node.</returns>
- IDictionary<IClusterNode, Task<T>>
BroadcastAsync<T>(IEnumerable<IClusterNode> nodes, string jobClassName, params
object?[]? args);
- }
+ /// <summary>
+ /// Executes a compute job represented by the given class on all of the
specified nodes.
+ /// </summary>
+ /// <param name="nodes">Nodes to use for the job execution.</param>
+ /// <param name="units">Deployment units. Can be empty.</param>
+ /// <param name="jobClassName">Java class name of the job to
execute.</param>
+ /// <param name="args">Job arguments.</param>
+ /// <typeparam name="T">Job result type.</typeparam>
+ /// <returns>A map of <see cref="Task"/> representing the asynchronous
operation for every node.</returns>
+ IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>(
+ IEnumerable<IClusterNode> nodes,
+ IEnumerable<DeploymentUnit> units,
+ string jobClassName,
+ params object?[]? args);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 80754f052d..f974d186cb 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Internal.Compute
{
using System;
+ using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
@@ -29,6 +30,7 @@ namespace Apache.Ignite.Internal.Compute
using Ignite.Network;
using Ignite.Table;
using Proto;
+ using Proto.MsgPack;
using Table;
using Table.Serialization;
@@ -58,46 +60,68 @@ namespace Apache.Ignite.Internal.Compute
}
/// <inheritdoc/>
- public async Task<T> ExecuteAsync<T>(IEnumerable<IClusterNode> nodes,
string jobClassName, params object?[]? args)
+ public async Task<T> ExecuteAsync<T>(
+ IEnumerable<IClusterNode> nodes,
+ IEnumerable<DeploymentUnit> units,
+ string jobClassName,
+ params object?[]? args)
{
IgniteArgumentCheck.NotNull(nodes, nameof(nodes));
IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName));
- return await ExecuteOnOneNode<T>(GetRandomNode(nodes),
jobClassName, args).ConfigureAwait(false);
+ return await ExecuteOnOneNode<T>(GetRandomNode(nodes), units,
jobClassName, args).ConfigureAwait(false);
}
/// <inheritdoc/>
- public async Task<T> ExecuteColocatedAsync<T>(string tableName,
IIgniteTuple key, string jobClassName, params object?[]? args) =>
+ public async Task<T> ExecuteColocatedAsync<T>(
+ string tableName,
+ IIgniteTuple key,
+ IEnumerable<DeploymentUnit> units,
+ string jobClassName,
+ params object?[]? args) =>
await ExecuteColocatedAsync<T, IIgniteTuple>(
tableName,
key,
- serializerHandlerFunc: _ =>
TupleSerializerHandler.Instance,
+ serializerHandlerFunc: static _ =>
TupleSerializerHandler.Instance,
+ units,
jobClassName,
args)
.ConfigureAwait(false);
/// <inheritdoc/>
- public async Task<T> ExecuteColocatedAsync<T, TKey>(string tableName,
TKey key, string jobClassName, params object?[]? args)
+ public async Task<T> ExecuteColocatedAsync<T, TKey>(
+ string tableName,
+ TKey key,
+ IEnumerable<DeploymentUnit> units,
+ string jobClassName,
+ params object?[]? args)
where TKey : notnull =>
await ExecuteColocatedAsync<T, TKey>(
tableName,
key,
serializerHandlerFunc: table =>
table.GetRecordViewInternal<TKey>().RecordSerializer.Handler,
+ units,
jobClassName,
args)
.ConfigureAwait(false);
/// <inheritdoc/>
- public IDictionary<IClusterNode, Task<T>>
BroadcastAsync<T>(IEnumerable<IClusterNode> nodes, string jobClassName, params
object?[]? args)
+ public IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>(
+ IEnumerable<IClusterNode> nodes,
+ IEnumerable<DeploymentUnit> units,
+ string jobClassName,
+ params object?[]? args)
{
IgniteArgumentCheck.NotNull(nodes, nameof(nodes));
IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName));
+ IgniteArgumentCheck.NotNull(units, nameof(units));
var res = new Dictionary<IClusterNode, Task<T>>();
+ var units0 = units as ICollection<DeploymentUnit> ??
units.ToList(); // Avoid multiple enumeration.
foreach (var node in nodes)
{
- var task = ExecuteOnOneNode<T>(node, jobClassName, args);
+ var task = ExecuteOnOneNode<T>(node, units0, jobClassName,
args);
res[node] = task;
}
@@ -123,7 +147,53 @@ namespace Apache.Ignite.Internal.Compute
private static ICollection<IClusterNode>
GetNodesCollection(IEnumerable<IClusterNode> nodes) =>
nodes as ICollection<IClusterNode> ?? nodes.ToList();
- private async Task<T> ExecuteOnOneNode<T>(IClusterNode node, string
jobClassName, object?[]? args)
+ private static void WriteUnits(IEnumerable<DeploymentUnit> units,
PooledArrayBuffer buf)
+ {
+ var w = buf.MessageWriter;
+
+ if (units is ICollection<DeploymentUnit> unitsCol)
+ {
+ w.WriteArrayHeader(unitsCol.Count);
+ foreach (var unit in units)
+ {
+ if (string.IsNullOrEmpty(unit.Name))
+ {
+ throw new ArgumentException("Deployment unit name
can't be null or empty.");
+ }
+
+ if (string.IsNullOrEmpty(unit.Version))
+ {
+ throw new ArgumentException("Deployment unit version
can't be null or empty.");
+ }
+
+ w.Write(unit.Name);
+ w.Write(unit.Version);
+ }
+
+ return;
+ }
+
+ // Enumerable without known count - enumerate first, write count
later.
+ var count = 0;
+ var countSpan = buf.GetSpan(5);
+ buf.Advance(5);
+
+ foreach (var unit in units)
+ {
+ count++;
+ w.Write(unit.Name);
+ w.Write(unit.Version);
+ }
+
+ countSpan[0] = MsgPackCode.Array32;
+ BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count);
+ }
+
+ private async Task<T> ExecuteOnOneNode<T>(
+ IClusterNode node,
+ IEnumerable<DeploymentUnit> units,
+ string jobClassName,
+ object?[]? args)
{
IgniteArgumentCheck.NotNull(node, nameof(node));
@@ -140,7 +210,7 @@ namespace Apache.Ignite.Internal.Compute
var w = writer.MessageWriter;
w.Write(node.Name);
- w.WriteNil(); // DeploymentUnits
+ WriteUnits(units, writer);
w.Write(jobClassName);
w.WriteObjectCollectionAsBinaryTuple(args);
}
@@ -177,6 +247,7 @@ namespace Apache.Ignite.Internal.Compute
string tableName,
TKey key,
Func<Table, IRecordSerializerHandler<TKey>> serializerHandlerFunc,
+ IEnumerable<DeploymentUnit> units,
string jobClassName,
params object?[]? args)
where TKey : notnull
@@ -185,6 +256,8 @@ namespace Apache.Ignite.Internal.Compute
IgniteArgumentCheck.NotNull(key, nameof(key));
IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName));
+ var units0 = units as ICollection<DeploymentUnit> ??
units.ToList(); // Avoid multiple enumeration.
+
while (true)
{
var table = await
GetTableAsync(tableName).ConfigureAwait(false);
@@ -219,7 +292,7 @@ namespace Apache.Ignite.Internal.Compute
var serializerHandler = serializerHandlerFunc(table);
var colocationHash = serializerHandler.Write(ref w, schema,
key, keyOnly: true, computeHash: true);
- w.WriteNil(); // DeploymentUnits
+ WriteUnits(units0, bufferWriter);
w.Write(jobClassName);
w.WriteObjectCollectionAsBinaryTuple(args);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 90f1102211..72178a5b19 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
@@ -59,6 +60,7 @@ import org.junit.jupiter.params.provider.CsvSource;
/**
* Thin client compute integration test.
*/
+@SuppressWarnings("resource")
public class ItThinClientComputeTest extends ItAbstractThinClientTest {
/** Test trace id. */
private static final UUID TRACE_ID = UUID.randomUUID();
@@ -203,6 +205,39 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
assertEquals(expectedNode, pojoRes);
}
+ @Test
+ void testExecuteOnUnknownUnitWithLatestVersionThrows() {
+ CompletionException ex = assertThrows(
+ CompletionException.class,
+ () -> client().compute().<String>execute(
+ Set.of(node(0)),
+ List.of(new DeploymentUnit("u", "latest")),
+ NodeNameJob.class.getName()).join());
+
+ var cause = (IgniteException) ex.getCause();
+ assertThat(cause.getMessage(), containsString("Deployment unit
u:latest doesn’t exist"));
+
+ // TODO IGNITE-19823 DeploymentUnitNotFoundException is internal, does
not propagate to client.
+ assertEquals(INTERNAL_ERR, cause.code());
+ }
+
+ @Test
+ void testExecuteColocatedOnUnknownUnitWithLatestVersionThrows() {
+ CompletionException ex = assertThrows(
+ CompletionException.class,
+ () -> client().compute().<String>executeColocated(
+ TABLE_NAME,
+ Tuple.create().set(COLUMN_KEY, 1),
+ List.of(new DeploymentUnit("u", "latest")),
+ NodeNameJob.class.getName()).join());
+
+ var cause = (IgniteException) ex.getCause();
+ assertThat(cause.getMessage(), containsString("Deployment unit
u:latest doesn’t exist"));
+
+ // TODO IGNITE-19823 DeploymentUnitNotFoundException is internal, does
not propagate to client.
+ assertEquals(INTERNAL_ERR, cause.code());
+ }
+
@Test
void testAllSupportedArgTypes() {
testEchoArg(Byte.MAX_VALUE);