This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 60acfd0922 IGNITE-19626 .NET: Propagate compute deployment units 
(#2248)
60acfd0922 is described below

commit 60acfd0922e497f342e389837a74ef4f2affe178
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Jun 26 17:36:56 2023 +0300

    IGNITE-19626 .NET: Propagate compute deployment units (#2248)
    
    * Propagate deployment units to .NET Compute API
    * Fix NPE in `DeploymentManagerImpl.detectLatestDeployedVersion`
    * Fix compute job hang in `ComputeComponentImpl.processExecuteRequest`
---
 .../org/apache/ignite/compute/IgniteCompute.java   |   8 +-
 .../apache/ignite/client/ClientComputeTest.java    |   1 +
 .../internal/deployunit/DeploymentManagerImpl.java |  16 ++-
 .../internal/deployunit/IgniteDeployment.java      |   2 +-
 .../internal/compute/ComputeComponentImpl.java     |  17 ++-
 .../Apache.Ignite.Tests/BasicAuthenticatorTests.cs |   4 +-
 .../Compute/ComputeClusterAwarenessTests.cs        |  20 +++-
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    | 120 ++++++++++++++++----
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       |  58 ++++++++--
 .../Linq/LinqSqlGenerationTests.cs                 |   2 +-
 .../Apache.Ignite.Tests/PartitionAwarenessTests.cs |  11 +-
 .../Proto/ColocationHashTests.cs                   |  10 +-
 .../platforms/dotnet/Apache.Ignite.sln.DotSettings |   1 +
 .../dotnet/Apache.Ignite/Compute/DeploymentUnit.cs |  33 ++++++
 .../dotnet/Apache.Ignite/Compute/ICompute.cs       | 121 ++++++++++++---------
 .../Apache.Ignite/Internal/Compute/Compute.cs      |  93 ++++++++++++++--
 .../runner/app/client/ItThinClientComputeTest.java |  35 ++++++
 17 files changed, 433 insertions(+), 119 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java 
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 038d8fe391..119c6231ea 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -36,7 +36,7 @@ public interface IgniteCompute {
      * Executes a {@link ComputeJob} of the given class on a single node from 
a set of candidate nodes.
      *
      * @param nodes    Candidate nodes; the job will be executed on one of 
them.
-     * @param units    Deployment units.
+     * @param units    Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
      * @param args     Arguments of the job.
      * @param <R>      Job result type
@@ -50,7 +50,7 @@ public interface IgniteCompute {
      *
      * @param tableName Name of the table whose key is used to determine the 
node to execute the job on.
      * @param key Key that identifies the node to execute the job on.
-     * @param units Deployment units.
+     * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
      * @param args Arguments of the job.
      * @param <R> Job result type.
@@ -65,7 +65,7 @@ public interface IgniteCompute {
      * @param tableName Name of the table whose key is used to determine the 
node to execute the job on.
      * @param key Key that identifies the node to execute the job on.
      * @param keyMapper Mapper used to map the key to a binary representation.
-     * @param units Deployment units.
+     * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
      * @param args Arguments of the job.
      * @param <R> Job result type.
@@ -84,7 +84,7 @@ public interface IgniteCompute {
      * Executes a {@link ComputeJob} of the given class on all nodes in the 
given node set.
      *
      * @param nodes Nodes to execute the job on.
-     * @param units Deployment units.
+     * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
      * @param args     Arguments of the job.
      * @param <R>      Job result type.
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index c884907379..fd6e783bb8 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -171,6 +171,7 @@ public class ClientComputeTest {
 
             assertEquals("", getUnits.apply(List.of()));
             assertEquals("u1:1.2.3", getUnits.apply(List.of(new 
DeploymentUnit("u1", "1.2.3"))));
+            assertEquals("u:latest", getUnits.apply(List.of(new 
DeploymentUnit("u", "LaTeSt"))));
             assertEquals(
                     "u1:1.2.3,unit2:latest",
                     getUnits.apply(List.of(new DeploymentUnit("u1", "1.2.3"), 
new DeploymentUnit("unit2", Version.LATEST))));
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 4f24d6a31f..3b4c0f28dd 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -339,11 +339,17 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
     @Override
     public CompletableFuture<Version> detectLatestDeployedVersion(String id) {
         return clusterStatusesAsync(id)
-                .thenApply(statuses -> statuses.versionStatuses().stream()
-                        .filter(e -> e.getStatus() == DEPLOYED)
-                        .reduce((first, second) -> second)
-                        .orElseThrow(() -> new 
DeploymentUnitNotFoundException(id))
-                        .getVersion());
+                .thenApply(statuses -> {
+                    if (statuses == null) {
+                        throw new DeploymentUnitNotFoundException(id, 
Version.LATEST);
+                    }
+
+                    return statuses.versionStatuses().stream()
+                            .filter(e -> e.getStatus() == DEPLOYED)
+                            .reduce((first, second) -> second)
+                            .orElseThrow(() -> new 
DeploymentUnitNotFoundException(id, Version.LATEST))
+                            .getVersion();
+                });
     }
 
     @Override
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
index 2f3a7782d9..980427f091 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
@@ -86,7 +86,7 @@ public interface IgniteDeployment extends IgniteComponent {
      * Lists all versions of the unit.
      *
      * @param id Unit identifier.
-     * @return Future with the unit statuses.
+     * @return Future with the unit statuses. Result of the future can be null 
when the specified unit does not exist.
      */
     CompletableFuture<UnitStatuses> clusterStatusesAsync(String id);
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 3aa401cfb4..5514632f4d 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -238,12 +238,19 @@ public class ComputeComponentImpl implements 
ComputeComponent {
             List<DeploymentUnit> units = 
toDeploymentUnit(executeRequest.deploymentUnits());
 
             mapClassLoaderExceptions(jobClassLoader(units), 
executeRequest.jobClassName())
-                    .thenCompose(context -> {
-                        return 
doExecuteLocally(jobClass(context.classLoader(), 
executeRequest.jobClassName()), executeRequest.args())
-                                        .whenComplete((r, e) -> 
context.close())
-                                        .handle((result, ex) -> 
sendExecuteResponse(result, ex, senderConsistentId, correlationId));
+                    .whenComplete((context, err) -> {
+                        if (err != null) {
+                            if (context != null) {
+                                context.close();
                             }
-                    );
+
+                            sendExecuteResponse(null, err, senderConsistentId, 
correlationId);
+                        }
+
+                        doExecuteLocally(jobClass(context.classLoader(), 
executeRequest.jobClassName()), executeRequest.args())
+                                .whenComplete((r, e) -> context.close())
+                                .handle((result, ex) -> 
sendExecuteResponse(result, ex, senderConsistentId, correlationId));
+                    });
         } finally {
             busyLock.leaveBusy();
         }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/BasicAuthenticatorTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/BasicAuthenticatorTests.cs
index 66eb808ba0..8a98732cd2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/BasicAuthenticatorTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/BasicAuthenticatorTests.cs
@@ -17,7 +17,9 @@
 
 namespace Apache.Ignite.Tests;
 
+using System;
 using System.Threading.Tasks;
+using Ignite.Compute;
 using NUnit.Framework;
 using Security;
 
@@ -101,7 +103,7 @@ public class BasicAuthenticatorTests : IgniteTestsBase
 
         try
         {
-            await client.Compute.ExecuteAsync<object>(nodes, EnableAuthnJob, 
enable ? 1 : 0);
+            await client.Compute.ExecuteAsync<object>(nodes, 
Array.Empty<DeploymentUnit>(), EnableAuthnJob, enable ? 1 : 0);
         }
         catch (IgniteClientConnectionException)
         {
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
index 43c796c822..ee3e78a8b1 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
@@ -17,9 +17,11 @@
 
 namespace Apache.Ignite.Tests.Compute
 {
+    using System;
     using System.Collections.Generic;
     using System.Linq;
     using System.Threading.Tasks;
+    using Ignite.Compute;
     using Internal.Proto;
     using NUnit.Framework;
 
@@ -45,8 +47,11 @@ namespace Apache.Ignite.Tests.Compute
             // ReSharper disable once AccessToDisposedClosure
             TestUtils.WaitForCondition(() => client.GetConnections().Count == 
3, 5000);
 
-            var res2 = await client.Compute.ExecuteAsync<string>(nodes: new[] 
{ server2.Node }, jobClassName: string.Empty);
-            var res3 = await client.Compute.ExecuteAsync<string>(nodes: new[] 
{ server3.Node }, jobClassName: string.Empty);
+            var res2 = await client.Compute.ExecuteAsync<string>(
+                new[] { server2.Node }, Array.Empty<DeploymentUnit>(), 
jobClassName: string.Empty);
+
+            var res3 = await client.Compute.ExecuteAsync<string>(
+                new[] { server3.Node }, Array.Empty<DeploymentUnit>(), 
jobClassName: string.Empty);
 
             Assert.AreEqual("s2", res2);
             Assert.AreEqual("s3", res3);
@@ -66,8 +71,11 @@ namespace Apache.Ignite.Tests.Compute
 
             using var client = await server1.ConnectClientAsync();
 
-            var res2 = await client.Compute.ExecuteAsync<string>(nodes: new[] 
{ server2.Node }, jobClassName: string.Empty);
-            var res3 = await client.Compute.ExecuteAsync<string>(nodes: new[] 
{ server3.Node }, jobClassName: string.Empty);
+            var res2 = await client.Compute.ExecuteAsync<string>(
+                new[] { server2.Node }, Array.Empty<DeploymentUnit>(), 
jobClassName: string.Empty);
+
+            var res3 = await client.Compute.ExecuteAsync<string>(
+                new[] { server3.Node }, Array.Empty<DeploymentUnit>(), 
jobClassName: string.Empty);
 
             Assert.AreEqual("s1", res2);
             Assert.AreEqual("s1", res3);
@@ -101,7 +109,9 @@ namespace Apache.Ignite.Tests.Compute
             for (int i = 0; i < 100; i++)
             {
                 var node = i % 2 == 0 ? server1.Node : server2.Node;
-                var res = await client.Compute.ExecuteAsync<string>(nodes: 
new[] { node }, jobClassName: string.Empty);
+                var res = await client.Compute.ExecuteAsync<string>(
+                    new[] { node }, Array.Empty<DeploymentUnit>(), 
jobClassName: string.Empty);
+
                 nodeNames.Add(res);
             }
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 23fff5ffd6..5a610e3b27 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -55,6 +55,8 @@ namespace Apache.Ignite.Tests.Compute
 
         private const string ExceptionJob = PlatformTestNodeRunner + 
"$ExceptionJob";
 
+        private static readonly IList<DeploymentUnit> Units = 
Array.Empty<DeploymentUnit>();
+
         [Test]
         public async Task TestGetClusterNodes()
         {
@@ -76,8 +78,8 @@ namespace Apache.Ignite.Tests.Compute
         [Test]
         public async Task TestExecuteOnSpecificNode()
         {
-            var res1 = await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(0), NodeNameJob, "-", 11);
-            var res2 = await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), NodeNameJob, ":", 22);
+            var res1 = await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(0), Units, NodeNameJob, "-", 11);
+            var res2 = await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), Units, NodeNameJob, ":", 22);
 
             Assert.AreEqual(PlatformTestNodeRunner + "-_11", res1);
             Assert.AreEqual(PlatformTestNodeRunner + "_2:_22", res2);
@@ -86,7 +88,7 @@ namespace Apache.Ignite.Tests.Compute
         [Test]
         public async Task TestExecuteOnRandomNode()
         {
-            var res = await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), NodeNameJob);
+            var res = await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), Units, NodeNameJob);
 
             var expectedNodeNames = Enumerable.Range(1, 4)
                 .Select(x => x == 1 ? PlatformTestNodeRunner : 
PlatformTestNodeRunner + "_" + x)
@@ -99,7 +101,7 @@ namespace Apache.Ignite.Tests.Compute
         public void TestExecuteResultTypeMismatchThrowsInvalidCastException()
         {
             Assert.ThrowsAsync<InvalidCastException>(async () =>
-                await Client.Compute.ExecuteAsync<Guid>(await 
Client.GetClusterNodesAsync(), NodeNameJob));
+                await Client.Compute.ExecuteAsync<Guid>(await 
Client.GetClusterNodesAsync(), Units, NodeNameJob));
         }
 
         [Test]
@@ -107,7 +109,7 @@ namespace Apache.Ignite.Tests.Compute
         {
             var nodes = await GetNodeAsync(0);
 
-            IDictionary<IClusterNode, Task<string>> taskMap = 
Client.Compute.BroadcastAsync<string>(nodes, NodeNameJob, "123");
+            IDictionary<IClusterNode, Task<string>> taskMap = 
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
             var res = await taskMap[nodes[0]];
 
             Assert.AreEqual(1, taskMap.Count);
@@ -121,7 +123,7 @@ namespace Apache.Ignite.Tests.Compute
         {
             var nodes = await Client.GetClusterNodesAsync();
 
-            IDictionary<IClusterNode, Task<string>> taskMap = 
Client.Compute.BroadcastAsync<string>(nodes, NodeNameJob, "123");
+            IDictionary<IClusterNode, Task<string>> taskMap = 
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
             var res1 = await taskMap[nodes[0]];
             var res2 = await taskMap[nodes[1]];
             var res3 = await taskMap[nodes[2]];
@@ -138,7 +140,7 @@ namespace Apache.Ignite.Tests.Compute
         [Test]
         public async Task TestExecuteWithArgs()
         {
-            var res = await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), ConcatJob, 1.1, Guid.Empty, "3", null);
+            var res = await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), Units, ConcatJob, 1.1, Guid.Empty, "3", null);
 
             Assert.AreEqual("1.1_00000000-0000-0000-0000-000000000000_3_null", 
res);
         }
@@ -146,7 +148,7 @@ namespace Apache.Ignite.Tests.Compute
         [Test]
         public async Task TestExecuteWithNullArgs()
         {
-            var res = await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), ConcatJob, args: null);
+            var res = await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), Units, ConcatJob, args: null);
 
             Assert.IsNull(res);
         }
@@ -155,7 +157,7 @@ namespace Apache.Ignite.Tests.Compute
         public void TestJobErrorPropagatesToClientWithClassAndMessage()
         {
             var ex = Assert.ThrowsAsync<IgniteException>(async () =>
-                await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), ErrorJob, "unused"));
+                await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), Units, ErrorJob, "unused"));
 
             StringAssert.Contains("Custom job error", ex!.Message);
 
@@ -175,7 +177,7 @@ namespace Apache.Ignite.Tests.Compute
             var unknownNode = new ClusterNode("x", "y", new 
IPEndPoint(IPAddress.Loopback, 0));
 
             var ex = Assert.ThrowsAsync<IgniteException>(async () =>
-                await Client.Compute.ExecuteAsync<string>(new[] { unknownNode 
}, EchoJob, "unused"));
+                await Client.Compute.ExecuteAsync<string>(new[] { unknownNode 
}, Units, EchoJob, "unused"));
 
             StringAssert.Contains("Specified node is not present in the 
cluster: y", ex!.Message);
         }
@@ -223,7 +225,7 @@ namespace Apache.Ignite.Tests.Compute
             {
                 var nodes = await Client.GetClusterNodesAsync();
                 var str = expectedStr ?? val.ToString()!.Replace("E+", "E");
-                var res = await Client.Compute.ExecuteAsync<object>(nodes, 
EchoJob, val, str);
+                var res = await Client.Compute.ExecuteAsync<object>(nodes, 
Units, EchoJob, val, str);
 
                 Assert.AreEqual(val, res);
             }
@@ -238,13 +240,13 @@ namespace Apache.Ignite.Tests.Compute
         public async Task TestExecuteColocated(long key, string nodeName)
         {
             var keyTuple = new IgniteTuple { [KeyCol] = key };
-            var resNodeName = await 
Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple, NodeNameJob);
+            var resNodeName = await 
Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple, Units, 
NodeNameJob);
 
             var keyPoco = new Poco { Key = key };
-            var resNodeName2 = await 
Client.Compute.ExecuteColocatedAsync<string, Poco>(TableName, keyPoco, 
NodeNameJob);
+            var resNodeName2 = await 
Client.Compute.ExecuteColocatedAsync<string, Poco>(TableName, keyPoco, 
Units.Reverse(), NodeNameJob);
 
             var keyPocoStruct = new PocoStruct(key, null);
-            var resNodeName3 = await 
Client.Compute.ExecuteColocatedAsync<string, PocoStruct>(TableName, 
keyPocoStruct, NodeNameJob);
+            var resNodeName3 = await 
Client.Compute.ExecuteColocatedAsync<string, PocoStruct>(TableName, 
keyPocoStruct, Units, NodeNameJob);
 
             var expectedNodeName = PlatformTestNodeRunner + nodeName;
             Assert.AreEqual(expectedNodeName, resNodeName);
@@ -256,7 +258,7 @@ namespace Apache.Ignite.Tests.Compute
         public void TestExecuteColocatedThrowsWhenTableDoesNotExist()
         {
             var ex = Assert.ThrowsAsync<IgniteClientException>(async () =>
-                await 
Client.Compute.ExecuteColocatedAsync<string>("unknownTable", new IgniteTuple(), 
EchoJob));
+                await 
Client.Compute.ExecuteColocatedAsync<string>("unknownTable", new IgniteTuple(), 
Units, EchoJob));
 
             Assert.AreEqual("Table 'unknownTable' does not exist.", 
ex!.Message);
         }
@@ -265,7 +267,7 @@ namespace Apache.Ignite.Tests.Compute
         public void TestExecuteColocatedThrowsWhenKeyColumnIsMissing()
         {
             var ex = Assert.ThrowsAsync<IgniteException>(async () =>
-                await Client.Compute.ExecuteColocatedAsync<string>(TableName, 
new IgniteTuple(), EchoJob));
+                await Client.Compute.ExecuteColocatedAsync<string>(TableName, 
new IgniteTuple(), Units, EchoJob));
 
             StringAssert.Contains("Missed key column: KEY", ex!.Message);
         }
@@ -275,25 +277,25 @@ namespace Apache.Ignite.Tests.Compute
         {
             // Create table and use it in ExecuteColocated.
             var nodes = await GetNodeAsync(0);
-            var tableName = await Client.Compute.ExecuteAsync<string>(nodes, 
CreateTableJob, "drop_me");
+            var tableName = await Client.Compute.ExecuteAsync<string>(nodes, 
Units, CreateTableJob, "drop_me");
 
             try
             {
                 var keyTuple = new IgniteTuple { [KeyCol] = 1L };
-                var resNodeName = await 
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob);
+                var resNodeName = await 
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units, 
NodeNameJob);
 
                 // Drop table and create a new one with a different ID, then 
execute a computation again.
                 // This should update the cached table and complete the 
computation successfully.
-                await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob, 
tableName);
-                await Client.Compute.ExecuteAsync<string>(nodes, 
CreateTableJob, tableName);
+                await Client.Compute.ExecuteAsync<string>(nodes, Units, 
DropTableJob, tableName);
+                await Client.Compute.ExecuteAsync<string>(nodes, Units, 
CreateTableJob, tableName);
 
-                var resNodeName2 = await 
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob);
+                var resNodeName2 = await 
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units, 
NodeNameJob);
 
                 Assert.AreEqual(resNodeName, resNodeName2);
             }
             finally
             {
-                await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob, 
tableName);
+                await Client.Compute.ExecuteAsync<string>(nodes, Units, 
DropTableJob, tableName);
             }
         }
 
@@ -301,7 +303,7 @@ namespace Apache.Ignite.Tests.Compute
         public void 
TestExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace()
         {
             var ex = Assert.ThrowsAsync<IgniteException>(async () =>
-                await Client.Compute.ExecuteAsync<object>(await 
GetNodeAsync(1), ExceptionJob, "foo-bar"));
+                await Client.Compute.ExecuteAsync<object>(await 
GetNodeAsync(1), Units, ExceptionJob, "foo-bar"));
 
             Assert.AreEqual("Test exception: foo-bar", ex!.Message);
             Assert.IsNotNull(ex.InnerException);
@@ -313,6 +315,78 @@ namespace Apache.Ignite.Tests.Compute
                 str);
         }
 
+        [Test]
+        public async Task TestDeploymentUnitsPropagation()
+        {
+            var units = new DeploymentUnit[]
+            {
+                new("unit-latest"),
+                new("unit1", "1.0.0")
+            };
+
+            using var server = new FakeServer();
+            using var client = await server.ConnectClientAsync();
+
+            var res = await client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), units, FakeServer.GetDetailsJob);
+            StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0", 
res);
+
+            // Lazy enumerable.
+            var res2 = await client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), units.Reverse(), FakeServer.GetDetailsJob);
+            StringAssert.Contains("Units = unit1|1.0.0, unit-latest|latest", 
res2);
+
+            // Colocated.
+            var keyTuple = new IgniteTuple { [KeyCol] = 1L };
+            var res3 = await client.Compute.ExecuteColocatedAsync<string>(
+                FakeServer.ExistingTableName, keyTuple, units, 
FakeServer.GetDetailsJob);
+
+            StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0", 
res3);
+        }
+
+        [Test]
+        public void TestExecuteOnUnknownUnitWithLatestVersionThrows()
+        {
+            var deploymentUnits = new DeploymentUnit[] { new("unit-latest") };
+
+            var ex = Assert.ThrowsAsync<IgniteException>(
+                async () => await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), deploymentUnits, NodeNameJob));
+
+            StringAssert.Contains("Deployment unit unit-latest:latest doesn’t 
exist", ex!.Message);
+        }
+
+        [Test]
+        public void TestExecuteColocatedOnUnknownUnitWithLatestVersionThrows()
+        {
+            var keyTuple = new IgniteTuple { [KeyCol] = 1L };
+            var deploymentUnits = new DeploymentUnit[] { new("unit-latest") };
+
+            var ex = Assert.ThrowsAsync<IgniteException>(
+                async () => await 
Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple, 
deploymentUnits, NodeNameJob));
+
+            StringAssert.Contains("Deployment unit unit-latest:latest doesn’t 
exist", ex!.Message);
+        }
+
+        [Test]
+        public void TestNullOrEmptyUnitNameThrows([Values(null, "")] string 
unitName)
+        {
+            var deploymentUnits = new DeploymentUnit[] { new(unitName) };
+
+            var ex = Assert.ThrowsAsync<ArgumentException>(
+                async () => await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), deploymentUnits, NodeNameJob));
+
+            Assert.AreEqual("Deployment unit name can't be null or empty.", 
ex!.Message);
+        }
+
+        [Test]
+        public void TestNullOrEmptyUnitVersionThrows([Values(null, "")] string 
unitVersion)
+        {
+            var deploymentUnits = new DeploymentUnit[] { new("u", unitVersion) 
};
+
+            var ex = Assert.ThrowsAsync<ArgumentException>(
+                async () => await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), deploymentUnits, NodeNameJob));
+
+            Assert.AreEqual("Deployment unit version can't be null or empty.", 
ex!.Message);
+        }
+
         private async Task<List<IClusterNode>> GetNodeAsync(int index) =>
             (await Client.GetClusterNodesAsync()).OrderBy(n => 
n.Name).Skip(index).Take(1).ToList();
     }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 9954a160ea..3eba544acf 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -26,6 +26,7 @@ namespace Apache.Ignite.Tests
     using System.Net.Sockets;
     using System.Threading;
     using System.Threading.Tasks;
+    using Ignite.Compute;
     using Ignite.Sql;
     using Internal.Buffers;
     using Internal.Network;
@@ -48,6 +49,8 @@ namespace Apache.Ignite.Tests
 
         public const string CustomColocationKeyTableName = "tbl3";
 
+        public const string GetDetailsJob = "get-details";
+
         private const int ExistingTableId = 1001;
 
         private const int CompositeKeyTableId = 1002;
@@ -619,14 +622,8 @@ namespace Apache.Ignite.Tests
 
                         case ClientOp.ComputeExecute:
                         {
-                            using var arrayBufferWriter = new 
PooledArrayBuffer();
-                            var writer = new MsgPackWriter(arrayBufferWriter);
-
-                            using var builder = new BinaryTupleBuilder(3);
-                            builder.AppendObjectWithType(Node.Name);
-                            writer.Write(builder.Build().Span);
-
-                            Send(handler, requestId, arrayBufferWriter);
+                            using var pooledArrayBuffer = 
ComputeExecute(reader);
+                            Send(handler, requestId, pooledArrayBuffer);
                             continue;
                         }
 
@@ -644,8 +641,11 @@ namespace Apache.Ignite.Tests
                             continue;
 
                         case ClientOp.ComputeExecuteColocated:
-                            Send(handler, requestId, new byte[] { 1, 
MessagePackCode.Nil }.AsMemory());
+                        {
+                            using var pooledArrayBuffer = 
ComputeExecute(reader, colocated: true);
+                            Send(handler, requestId, pooledArrayBuffer);
                             continue;
+                        }
                     }
 
                     // Fake error message for any other op code.
@@ -664,6 +664,46 @@ namespace Apache.Ignite.Tests
             }
         }
 
+        private PooledArrayBuffer ComputeExecute(MsgPackReader reader, bool 
colocated = false)
+        {
+            // Colocated: table id, schema version, key.
+            // Else: node name.
+            reader.Skip(colocated ? 4 : 1);
+
+            var unitsCount = reader.TryReadNil() ? 0 : 
reader.ReadArrayHeader();
+            var units = new List<DeploymentUnit>(unitsCount);
+            for (int i = 0; i < unitsCount; i++)
+            {
+                units.Add(new DeploymentUnit(reader.ReadString(), 
reader.ReadString()));
+            }
+
+            var jobClassName = reader.ReadString();
+
+            object? resObj = jobClassName == GetDetailsJob
+                ? new
+                {
+                    NodeName = Node.Name,
+                    Units = string.Join(", ", units.Select(u => 
$"{u.Name}|{u.Version}")),
+                    jobClassName
+                }.ToString()
+                : Node.Name;
+
+            using var builder = new BinaryTupleBuilder(3);
+            builder.AppendObjectWithType(resObj);
+
+            var arrayBufferWriter = new PooledArrayBuffer();
+            var writer = new MsgPackWriter(arrayBufferWriter);
+
+            if (colocated)
+            {
+                writer.Write(1); // Latest schema.
+            }
+
+            writer.Write(builder.Build().Span);
+
+            return arrayBufferWriter;
+        }
+
         internal record struct RequestContext(int RequestCount, ClientOp 
OpCode, long RequestId);
 
         [SuppressMessage("Design", "CA1032:Implement standard exception 
constructors", Justification = "Tests.")]
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs
index 9a37d22972..cba4fa7a31 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs
@@ -358,7 +358,7 @@ public partial class LinqSqlGenerationTests
             "where ((_T0.KEY IS NOT DISTINCT FROM ?) and (_T0.VAL IS DISTINCT 
FROM ?))";
 
         const string expectedToString =
-            "IgniteQueryable`1[<>f__AnonymousType4`2[String, Int64]] { Query = 
" +
+            "IgniteQueryable`1[<>f__AnonymousType5`2[String, Int64]] { Query = 
" +
             expectedQueryText +
             ", Parameters = [ 3, v-2 ] }";
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index 173f70b665..aa538784c4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -21,6 +21,7 @@ using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Threading.Tasks;
+using Ignite.Compute;
 using Ignite.Table;
 using Internal.Proto;
 using NUnit.Framework;
@@ -297,10 +298,10 @@ public class PartitionAwarenessTests
         var key = new IgniteTuple { ["ID"] = keyId };
 
         // Warm up.
-        await 
client.Compute.ExecuteColocatedAsync<object?>(FakeServer.ExistingTableName, 
key, "job");
+        await 
client.Compute.ExecuteColocatedAsync<object?>(FakeServer.ExistingTableName, 
key, Array.Empty<DeploymentUnit>(), "job");
 
         await AssertOpOnNode(
-            () => 
client.Compute.ExecuteColocatedAsync<object?>(FakeServer.ExistingTableName, 
key, "job"),
+            () => 
client.Compute.ExecuteColocatedAsync<object?>(FakeServer.ExistingTableName, 
key, Array.Empty<DeploymentUnit>(), "job"),
             ClientOp.ComputeExecuteColocated,
             expectedNode);
     }
@@ -314,10 +315,12 @@ public class PartitionAwarenessTests
         var key = new SimpleKey(keyId);
 
         // Warm up.
-        await client.Compute.ExecuteColocatedAsync<object?, 
SimpleKey>(FakeServer.ExistingTableName, key, "job");
+        await client.Compute.ExecuteColocatedAsync<object?, SimpleKey>(
+            FakeServer.ExistingTableName, key, Array.Empty<DeploymentUnit>(), 
"job");
 
         await AssertOpOnNode(
-            () => client.Compute.ExecuteColocatedAsync<object?, 
SimpleKey>(FakeServer.ExistingTableName, key, "job"),
+            () => client.Compute.ExecuteColocatedAsync<object?, SimpleKey>(
+                FakeServer.ExistingTableName, key, 
Array.Empty<DeploymentUnit>(), "job"),
             ClientOp.ComputeExecuteColocated,
             expectedNode);
     }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
index 4b97a4cf1d..999e17581f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
@@ -25,6 +25,7 @@ using System.Linq;
 using System.Numerics;
 using System.Reflection;
 using System.Threading.Tasks;
+using Ignite.Compute;
 using Ignite.Sql;
 using Ignite.Table;
 using Internal.Buffers;
@@ -279,7 +280,14 @@ public class ColocationHashTests : IgniteTestsBase
     {
         var nodes = await Client.GetClusterNodesAsync();
 
-        return await Client.Compute.ExecuteAsync<int>(nodes, 
ColocationHashJob, count, bytes, timePrecision, timestampPrecision);
+        return await Client.Compute.ExecuteAsync<int>(
+            nodes,
+            Array.Empty<DeploymentUnit>(),
+            ColocationHashJob,
+            count,
+            bytes,
+            timePrecision,
+            timestampPrecision);
     }
 
     private record TestIndexProvider(Func<int, bool> Delegate) : 
IHashedColumnIndexProvider
diff --git a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings 
b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
index 23b1f4e508..54f37083a4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
+++ b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
@@ -24,6 +24,7 @@
        <s:Boolean 
x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
        <s:Boolean 
x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
        <s:Boolean 
x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
+       <s:Boolean 
x:Key="/Default/UserDictionary/Words/=Colocated/@EntryIndexedValue">True</s:Boolean>
        <s:Boolean 
x:Key="/Default/UserDictionary/Words/=colocation/@EntryIndexedValue">True</s:Boolean>
        <s:Boolean 
x:Key="/Default/UserDictionary/Words/=failover/@EntryIndexedValue">True</s:Boolean>
        <s:Boolean 
x:Key="/Default/UserDictionary/Words/=Subquery/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/DeploymentUnit.cs 
b/modules/platforms/dotnet/Apache.Ignite/Compute/DeploymentUnit.cs
new file mode 100644
index 0000000000..3597976e7e
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/DeploymentUnit.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Compute;
+
+/// <summary>
+/// Deployment unit identifier.
+/// </summary>
+/// <param name="Name">Unit name.</param>
+/// <param name="Version">Unit version. Defaults to <see 
cref="LatestVersion"/>.</param>
+public sealed record DeploymentUnit(
+    string Name,
+    string Version = DeploymentUnit.LatestVersion)
+{
+    /// <summary>
+    /// Latest version.
+    /// </summary>
+    public const string LatestVersion = "latest";
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
index 30ae11d386..5175b475cf 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
@@ -15,60 +15,81 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Compute
-{
-    using System.Collections.Generic;
-    using System.Threading.Tasks;
-    using Network;
-    using Table;
+namespace Apache.Ignite.Compute;
+
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Network;
+using Table;
 
+/// <summary>
+/// Ignite Compute API provides distributed job execution functionality.
+/// </summary>
+public interface ICompute
+{
     /// <summary>
-    /// Ignite Compute API provides distributed job execution functionality.
+    /// Executes a compute job represented by the given class on one of the 
specified nodes.
     /// </summary>
-    public interface ICompute
-    {
-        /// <summary>
-        /// Executes a compute job represented by the given class on one of 
the specified nodes.
-        /// </summary>
-        /// <param name="nodes">Nodes to use for the job execution.</param>
-        /// <param name="jobClassName">Java class name of the job to 
execute.</param>
-        /// <param name="args">Job arguments.</param>
-        /// <typeparam name="T">Job result type.</typeparam>
-        /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
-        Task<T> ExecuteAsync<T>(IEnumerable<IClusterNode> nodes, string 
jobClassName, params object?[]? args);
+    /// <param name="nodes">Nodes to use for the job execution.</param>
+    /// <param name="units">Deployment units. Can be empty.</param>
+    /// <param name="jobClassName">Java class name of the job to 
execute.</param>
+    /// <param name="args">Job arguments.</param>
+    /// <typeparam name="T">Job result type.</typeparam>
+    /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
+    Task<T> ExecuteAsync<T>(
+        IEnumerable<IClusterNode> nodes,
+        IEnumerable<DeploymentUnit> units,
+        string jobClassName,
+        params object?[]? args);
 
-        /// <summary>
-        /// Executes a job represented by the given class on one node where 
the given key is located.
-        /// </summary>
-        /// <param name="tableName">Name of the table to be used with 
<paramref name="key"/> to determine target node.</param>
-        /// <param name="key">Table key to be used to determine the target 
node for job execution.</param>
-        /// <param name="jobClassName">Java class name of the job to 
execute.</param>
-        /// <param name="args">Job arguments.</param>
-        /// <typeparam name="T">Job result type.</typeparam>
-        /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
-        Task<T> ExecuteColocatedAsync<T>(string tableName, IIgniteTuple key, 
string jobClassName, params object?[]? args);
+    /// <summary>
+    /// Executes a job represented by the given class on one node where the 
given key is located.
+    /// </summary>
+    /// <param name="tableName">Name of the table to be used with <paramref 
name="key"/> to determine target node.</param>
+    /// <param name="key">Table key to be used to determine the target node 
for job execution.</param>
+    /// <param name="units">Deployment units. Can be empty.</param>
+    /// <param name="jobClassName">Java class name of the job to 
execute.</param>
+    /// <param name="args">Job arguments.</param>
+    /// <typeparam name="T">Job result type.</typeparam>
+    /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
+    Task<T> ExecuteColocatedAsync<T>(
+        string tableName,
+        IIgniteTuple key,
+        IEnumerable<DeploymentUnit> units,
+        string jobClassName,
+        params object?[]? args);
 
-        /// <summary>
-        /// Executes a job represented by the given class on one node where 
the given key is located.
-        /// </summary>
-        /// <param name="tableName">Name of the table to be used with 
<paramref name="key"/> to determine target node.</param>
-        /// <param name="key">Table key to be used to determine the target 
node for job execution.</param>
-        /// <param name="jobClassName">Java class name of the job to 
execute.</param>
-        /// <param name="args">Job arguments.</param>
-        /// <typeparam name="T">Job result type.</typeparam>
-        /// <typeparam name="TKey">Key type.</typeparam>
-        /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
-        Task<T> ExecuteColocatedAsync<T, TKey>(string tableName, TKey key, 
string jobClassName, params object?[]? args)
-            where TKey : notnull;
+    /// <summary>
+    /// Executes a job represented by the given class on one node where the 
given key is located.
+    /// </summary>
+    /// <param name="tableName">Name of the table to be used with <paramref 
name="key"/> to determine target node.</param>
+    /// <param name="key">Table key to be used to determine the target node 
for job execution.</param>
+    /// <param name="units">Deployment units. Can be empty.</param>
+    /// <param name="jobClassName">Java class name of the job to 
execute.</param>
+    /// <param name="args">Job arguments.</param>
+    /// <typeparam name="T">Job result type.</typeparam>
+    /// <typeparam name="TKey">Key type.</typeparam>
+    /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
+    Task<T> ExecuteColocatedAsync<T, TKey>(
+        string tableName,
+        TKey key,
+        IEnumerable<DeploymentUnit> units,
+        string jobClassName,
+        params object?[]? args)
+        where TKey : notnull;
 
-        /// <summary>
-        /// Executes a compute job represented by the given class on all of 
the specified nodes.
-        /// </summary>
-        /// <param name="nodes">Nodes to use for the job execution.</param>
-        /// <param name="jobClassName">Java class name of the job to 
execute.</param>
-        /// <param name="args">Job arguments.</param>
-        /// <typeparam name="T">Job result type.</typeparam>
-        /// <returns>A map of <see cref="Task"/> representing the asynchronous 
operation for every node.</returns>
-        IDictionary<IClusterNode, Task<T>> 
BroadcastAsync<T>(IEnumerable<IClusterNode> nodes, string jobClassName, params 
object?[]? args);
-    }
+    /// <summary>
+    /// Executes a compute job represented by the given class on all of the 
specified nodes.
+    /// </summary>
+    /// <param name="nodes">Nodes to use for the job execution.</param>
+    /// <param name="units">Deployment units. Can be empty.</param>
+    /// <param name="jobClassName">Java class name of the job to 
execute.</param>
+    /// <param name="args">Job arguments.</param>
+    /// <typeparam name="T">Job result type.</typeparam>
+    /// <returns>A map of <see cref="Task"/> representing the asynchronous 
operation for every node.</returns>
+    IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>(
+        IEnumerable<IClusterNode> nodes,
+        IEnumerable<DeploymentUnit> units,
+        string jobClassName,
+        params object?[]? args);
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 80754f052d..f974d186cb 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -18,6 +18,7 @@
 namespace Apache.Ignite.Internal.Compute
 {
     using System;
+    using System.Buffers.Binary;
     using System.Collections.Concurrent;
     using System.Collections.Generic;
     using System.Diagnostics.CodeAnalysis;
@@ -29,6 +30,7 @@ namespace Apache.Ignite.Internal.Compute
     using Ignite.Network;
     using Ignite.Table;
     using Proto;
+    using Proto.MsgPack;
     using Table;
     using Table.Serialization;
 
@@ -58,46 +60,68 @@ namespace Apache.Ignite.Internal.Compute
         }
 
         /// <inheritdoc/>
-        public async Task<T> ExecuteAsync<T>(IEnumerable<IClusterNode> nodes, 
string jobClassName, params object?[]? args)
+        public async Task<T> ExecuteAsync<T>(
+            IEnumerable<IClusterNode> nodes,
+            IEnumerable<DeploymentUnit> units,
+            string jobClassName,
+            params object?[]? args)
         {
             IgniteArgumentCheck.NotNull(nodes, nameof(nodes));
             IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName));
 
-            return await ExecuteOnOneNode<T>(GetRandomNode(nodes), 
jobClassName, args).ConfigureAwait(false);
+            return await ExecuteOnOneNode<T>(GetRandomNode(nodes), units, 
jobClassName, args).ConfigureAwait(false);
         }
 
         /// <inheritdoc/>
-        public async Task<T> ExecuteColocatedAsync<T>(string tableName, 
IIgniteTuple key, string jobClassName, params object?[]? args) =>
+        public async Task<T> ExecuteColocatedAsync<T>(
+            string tableName,
+            IIgniteTuple key,
+            IEnumerable<DeploymentUnit> units,
+            string jobClassName,
+            params object?[]? args) =>
             await ExecuteColocatedAsync<T, IIgniteTuple>(
                     tableName,
                     key,
-                    serializerHandlerFunc: _ => 
TupleSerializerHandler.Instance,
+                    serializerHandlerFunc: static _ => 
TupleSerializerHandler.Instance,
+                    units,
                     jobClassName,
                     args)
                 .ConfigureAwait(false);
 
         /// <inheritdoc/>
-        public async Task<T> ExecuteColocatedAsync<T, TKey>(string tableName, 
TKey key, string jobClassName, params object?[]? args)
+        public async Task<T> ExecuteColocatedAsync<T, TKey>(
+            string tableName,
+            TKey key,
+            IEnumerable<DeploymentUnit> units,
+            string jobClassName,
+            params object?[]? args)
             where TKey : notnull =>
             await ExecuteColocatedAsync<T, TKey>(
                     tableName,
                     key,
                     serializerHandlerFunc: table => 
table.GetRecordViewInternal<TKey>().RecordSerializer.Handler,
+                    units,
                     jobClassName,
                     args)
                 .ConfigureAwait(false);
 
         /// <inheritdoc/>
-        public IDictionary<IClusterNode, Task<T>> 
BroadcastAsync<T>(IEnumerable<IClusterNode> nodes, string jobClassName, params 
object?[]? args)
+        public IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>(
+            IEnumerable<IClusterNode> nodes,
+            IEnumerable<DeploymentUnit> units,
+            string jobClassName,
+            params object?[]? args)
         {
             IgniteArgumentCheck.NotNull(nodes, nameof(nodes));
             IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName));
+            IgniteArgumentCheck.NotNull(units, nameof(units));
 
             var res = new Dictionary<IClusterNode, Task<T>>();
+            var units0 = units as ICollection<DeploymentUnit> ?? 
units.ToList(); // Avoid multiple enumeration.
 
             foreach (var node in nodes)
             {
-                var task = ExecuteOnOneNode<T>(node, jobClassName, args);
+                var task = ExecuteOnOneNode<T>(node, units0, jobClassName, 
args);
 
                 res[node] = task;
             }
@@ -123,7 +147,53 @@ namespace Apache.Ignite.Internal.Compute
         private static ICollection<IClusterNode> 
GetNodesCollection(IEnumerable<IClusterNode> nodes) =>
             nodes as ICollection<IClusterNode> ?? nodes.ToList();
 
-        private async Task<T> ExecuteOnOneNode<T>(IClusterNode node, string 
jobClassName, object?[]? args)
+        private static void WriteUnits(IEnumerable<DeploymentUnit> units, 
PooledArrayBuffer buf)
+        {
+            var w = buf.MessageWriter;
+
+            if (units is ICollection<DeploymentUnit> unitsCol)
+            {
+                w.WriteArrayHeader(unitsCol.Count);
+                foreach (var unit in units)
+                {
+                    if (string.IsNullOrEmpty(unit.Name))
+                    {
+                        throw new ArgumentException("Deployment unit name 
can't be null or empty.");
+                    }
+
+                    if (string.IsNullOrEmpty(unit.Version))
+                    {
+                        throw new ArgumentException("Deployment unit version 
can't be null or empty.");
+                    }
+
+                    w.Write(unit.Name);
+                    w.Write(unit.Version);
+                }
+
+                return;
+            }
+
+            // Enumerable without known count - enumerate first, write count 
later.
+            var count = 0;
+            var countSpan = buf.GetSpan(5);
+            buf.Advance(5);
+
+            foreach (var unit in units)
+            {
+                count++;
+                w.Write(unit.Name);
+                w.Write(unit.Version);
+            }
+
+            countSpan[0] = MsgPackCode.Array32;
+            BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count);
+        }
+
+        private async Task<T> ExecuteOnOneNode<T>(
+            IClusterNode node,
+            IEnumerable<DeploymentUnit> units,
+            string jobClassName,
+            object?[]? args)
         {
             IgniteArgumentCheck.NotNull(node, nameof(node));
 
@@ -140,7 +210,7 @@ namespace Apache.Ignite.Internal.Compute
                 var w = writer.MessageWriter;
 
                 w.Write(node.Name);
-                w.WriteNil(); // DeploymentUnits
+                WriteUnits(units, writer);
                 w.Write(jobClassName);
                 w.WriteObjectCollectionAsBinaryTuple(args);
             }
@@ -177,6 +247,7 @@ namespace Apache.Ignite.Internal.Compute
             string tableName,
             TKey key,
             Func<Table, IRecordSerializerHandler<TKey>> serializerHandlerFunc,
+            IEnumerable<DeploymentUnit> units,
             string jobClassName,
             params object?[]? args)
             where TKey : notnull
@@ -185,6 +256,8 @@ namespace Apache.Ignite.Internal.Compute
             IgniteArgumentCheck.NotNull(key, nameof(key));
             IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName));
 
+            var units0 = units as ICollection<DeploymentUnit> ?? 
units.ToList(); // Avoid multiple enumeration.
+
             while (true)
             {
                 var table = await 
GetTableAsync(tableName).ConfigureAwait(false);
@@ -219,7 +292,7 @@ namespace Apache.Ignite.Internal.Compute
                 var serializerHandler = serializerHandlerFunc(table);
                 var colocationHash = serializerHandler.Write(ref w, schema, 
key, keyOnly: true, computeHash: true);
 
-                w.WriteNil(); // DeploymentUnits
+                WriteUnits(units0, bufferWriter);
                 w.Write(jobClassName);
                 w.WriteObjectCollectionAsBinaryTuple(args);
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 90f1102211..72178a5b19 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.stream.Collectors;
 import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
@@ -59,6 +60,7 @@ import org.junit.jupiter.params.provider.CsvSource;
 /**
  * Thin client compute integration test.
  */
+@SuppressWarnings("resource")
 public class ItThinClientComputeTest extends ItAbstractThinClientTest {
     /** Test trace id. */
     private static final UUID TRACE_ID = UUID.randomUUID();
@@ -203,6 +205,39 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         assertEquals(expectedNode, pojoRes);
     }
 
+    @Test
+    void testExecuteOnUnknownUnitWithLatestVersionThrows() {
+        CompletionException ex = assertThrows(
+                CompletionException.class,
+                () -> client().compute().<String>execute(
+                        Set.of(node(0)),
+                        List.of(new DeploymentUnit("u", "latest")),
+                        NodeNameJob.class.getName()).join());
+
+        var cause = (IgniteException) ex.getCause();
+        assertThat(cause.getMessage(), containsString("Deployment unit 
u:latest doesn’t exist"));
+
+        // TODO IGNITE-19823 DeploymentUnitNotFoundException is internal, does 
not propagate to client.
+        assertEquals(INTERNAL_ERR, cause.code());
+    }
+
+    @Test
+    void testExecuteColocatedOnUnknownUnitWithLatestVersionThrows() {
+        CompletionException ex = assertThrows(
+                CompletionException.class,
+                () -> client().compute().<String>executeColocated(
+                        TABLE_NAME,
+                        Tuple.create().set(COLUMN_KEY, 1),
+                        List.of(new DeploymentUnit("u", "latest")),
+                        NodeNameJob.class.getName()).join());
+
+        var cause = (IgniteException) ex.getCause();
+        assertThat(cause.getMessage(), containsString("Deployment unit 
u:latest doesn’t exist"));
+
+        // TODO IGNITE-19823 DeploymentUnitNotFoundException is internal, does 
not propagate to client.
+        assertEquals(INTERNAL_ERR, cause.code());
+    }
+
     @Test
     void testAllSupportedArgTypes() {
         testEchoArg(Byte.MAX_VALUE);

Reply via email to