This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0fc99e9a4a IGNITE-19624 Java client: propagate compute deployment 
units (#2230)
0fc99e9a4a is described below

commit 0fc99e9a4a788a3f7bbdb64dec70f641dac6a421
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Jun 22 14:01:39 2023 +0300

    IGNITE-19624 Java client: propagate compute deployment units (#2230)
---
 .../ClientComputeExecuteColocatedRequest.java      |  5 ++-
 .../compute/ClientComputeExecuteRequest.java       | 22 ++++++++++++-
 .../internal/client/compute/ClientCompute.java     | 36 +++++++++++++++-------
 .../apache/ignite/client/ClientComputeTest.java    | 19 ++++++++++++
 .../apache/ignite/client/fakes/FakeCompute.java    |  9 ++++++
 .../ignite/internal/compute/IgniteComputeImpl.java | 10 ++++--
 .../ignite/client/detail/compute/compute_impl.cpp  |  2 ++
 .../Apache.Ignite/Internal/Compute/Compute.cs      |  2 ++
 8 files changed, 89 insertions(+), 16 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
index 65a3b75e5d..97bb453404 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
@@ -18,11 +18,13 @@
 package org.apache.ignite.client.handler.requests.compute;
 
 import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.unpackArgs;
+import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.unpackDeploymentUnits;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -49,10 +51,11 @@ public class ClientComputeExecuteColocatedRequest {
         var table = readTable(in, tables);
         var keyTuple = readTuple(in, table, true);
 
+        List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in);
         String jobClassName = in.unpackString();
         Object[] args = unpackArgs(in);
 
-        return compute.executeColocated(table.name(), keyTuple, List.of(), 
jobClassName, args).thenAccept(val -> {
+        return compute.executeColocated(table.name(), keyTuple, 
deploymentUnits, jobClassName, args).thenAccept(val -> {
             out.packInt(table.schemaView().lastSchemaVersion());
             out.packObjectAsBinaryTuple(val);
         });
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 8a94c70e98..3b502039b7 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.client.handler.requests.compute;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -54,11 +56,12 @@ public class ClientComputeExecuteRequest {
             throw new IgniteException("Specified node is not present in the 
cluster: " + nodeName);
         }
 
+        List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in);
         String jobClassName = in.unpackString();
 
         Object[] args = unpackArgs(in);
 
-        return compute.execute(Set.of(node), List.of(), jobClassName, 
args).thenAccept(out::packObjectAsBinaryTuple);
+        return compute.execute(Set.of(node), deploymentUnits, jobClassName, 
args).thenAccept(out::packObjectAsBinaryTuple);
     }
 
     /**
@@ -70,4 +73,21 @@ public class ClientComputeExecuteRequest {
     static Object[] unpackArgs(ClientMessageUnpacker in) {
         return in.unpackObjectArrayFromBinaryTuple();
     }
+
+    /**
+     * Unpacks deployment units.
+     *
+     * @param in Unpacker.
+     * @return Deployment units.
+     */
+    static List<DeploymentUnit> unpackDeploymentUnits(ClientMessageUnpacker 
in) {
+        int size = in.tryUnpackNil() ? 0 : in.unpackArrayHeader();
+        List<DeploymentUnit> res = new ArrayList<>(size);
+
+        for (int i = 0; i < size; i++) {
+            res.add(new DeploymentUnit(in.unpackString(), in.unpackString()));
+        }
+
+        return res;
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index c6cac364a4..db1e029516 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -78,6 +78,7 @@ public class ClientCompute implements IgniteCompute {
     @Override
     public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, 
List<DeploymentUnit> units, String jobClassName, Object... args) {
         Objects.requireNonNull(nodes);
+        Objects.requireNonNull(units);
         Objects.requireNonNull(jobClassName);
 
         if (nodes.isEmpty()) {
@@ -86,7 +87,7 @@ public class ClientCompute implements IgniteCompute {
 
         ClusterNode node = randomNode(nodes);
 
-        return executeOnOneNode(node, jobClassName, args);
+        return executeOnOneNode(node, units, jobClassName, args);
     }
 
     /** {@inheritDoc} */
@@ -100,10 +101,11 @@ public class ClientCompute implements IgniteCompute {
     ) {
         Objects.requireNonNull(tableName);
         Objects.requireNonNull(key);
+        Objects.requireNonNull(units);
         Objects.requireNonNull(jobClassName);
 
         return getTable(tableName)
-                .thenCompose(table -> (CompletableFuture<R>) 
executeColocatedTupleKey(table, key, jobClassName, args))
+                .thenCompose(table -> (CompletableFuture<R>) 
executeColocatedTupleKey(table, key, units, jobClassName, args))
                 .handle((res, err) -> handleMissingTable(tableName, res, err))
                 .thenCompose(r ->
                         // If a table was dropped, try again: maybe a new 
table was created with the same name and new id.
@@ -125,10 +127,11 @@ public class ClientCompute implements IgniteCompute {
         Objects.requireNonNull(tableName);
         Objects.requireNonNull(key);
         Objects.requireNonNull(keyMapper);
+        Objects.requireNonNull(units);
         Objects.requireNonNull(jobClassName);
 
         return getTable(tableName)
-                .thenCompose(table -> (CompletableFuture<R>) 
executeColocatedObjectKey(table, key, keyMapper, jobClassName, args))
+                .thenCompose(table -> (CompletableFuture<R>) 
executeColocatedObjectKey(table, key, keyMapper, units, jobClassName, args))
                 .handle((res, err) -> handleMissingTable(tableName, res, err))
                 .thenCompose(r ->
                         // If a table was dropped, try again: maybe a new 
table was created with the same name and new id.
@@ -146,12 +149,13 @@ public class ClientCompute implements IgniteCompute {
             Object... args
     ) {
         Objects.requireNonNull(nodes);
+        Objects.requireNonNull(units);
         Objects.requireNonNull(jobClassName);
 
         Map<ClusterNode, CompletableFuture<R>> map = new 
HashMap<>(nodes.size());
 
         for (ClusterNode node : nodes) {
-            if (map.put(node, executeOnOneNode(node, jobClassName, args)) != 
null) {
+            if (map.put(node, executeOnOneNode(node, units, jobClassName, 
args)) != null) {
                 throw new IllegalStateException("Node can't be specified more 
than once: " + node);
             }
         }
@@ -159,7 +163,7 @@ public class ClientCompute implements IgniteCompute {
         return map;
     }
 
-    private <R> CompletableFuture<R> executeOnOneNode(ClusterNode node, String 
jobClassName, Object[] args) {
+    private <R> CompletableFuture<R> executeOnOneNode(ClusterNode node, 
List<DeploymentUnit> units, String jobClassName, Object[] args) {
         return ch.serviceAsync(ClientOp.COMPUTE_EXECUTE, w -> {
             if 
(w.clientChannel().protocolContext().clusterNode().name().equals(node.name())) {
                 w.out().packNil();
@@ -167,8 +171,7 @@ public class ClientCompute implements IgniteCompute {
                 w.out().packString(node.name());
             }
 
-            w.out().packString(jobClassName);
-            w.out().packObjectArrayAsBinaryTuple(args);
+            packJob(w.out(), units, jobClassName, args);
         }, r -> (R) r.in().unpackObjectFromBinaryTuple(), node.name(), null, 
null);
     }
 
@@ -191,6 +194,7 @@ public class ClientCompute implements IgniteCompute {
             ClientTable t,
             K key,
             Mapper<K> keyMapper,
+            List<DeploymentUnit> units,
             String jobClassName,
             Object[] args) {
         return t.doSchemaOutOpAsync(
@@ -203,8 +207,7 @@ public class ClientCompute implements IgniteCompute {
 
                     ClientRecordSerializer.writeRecRaw(key, keyMapper, schema, 
w, TuplePart.KEY);
 
-                    w.packString(jobClassName);
-                    w.packObjectArrayAsBinaryTuple(args);
+                    packJob(w, units, jobClassName, args);
                 },
                 r -> (R) r.unpackObjectFromBinaryTuple(),
                 ClientTupleSerializer.getPartitionAwarenessProvider(null, 
keyMapper, key));
@@ -213,6 +216,7 @@ public class ClientCompute implements IgniteCompute {
     private static <R> CompletableFuture<R> executeColocatedTupleKey(
             ClientTable t,
             Tuple key,
+            List<DeploymentUnit> units,
             String jobClassName,
             Object[] args) {
         return t.doSchemaOutOpAsync(
@@ -225,8 +229,7 @@ public class ClientCompute implements IgniteCompute {
 
                     ClientTupleSerializer.writeTupleRaw(key, schema, 
outputChannel, true);
 
-                    w.packString(jobClassName);
-                    w.packObjectArrayAsBinaryTuple(args);
+                    packJob(w, units, jobClassName, args);
                 },
                 r -> (R) r.unpackObjectFromBinaryTuple(),
                 ClientTupleSerializer.getPartitionAwarenessProvider(null, 
key));
@@ -274,4 +277,15 @@ public class ClientCompute implements IgniteCompute {
 
         return res;
     }
+
+    private static void packJob(ClientMessagePacker w, List<DeploymentUnit> 
units, String jobClassName, Object[] args) {
+        w.packArrayHeader(units.size());
+        for (DeploymentUnit unit : units) {
+            w.packString(unit.name());
+            w.packString(unit.version().render());
+        }
+
+        w.packString(jobClassName);
+        w.packObjectArrayAsBinaryTuple(args);
+    }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index 3d811918cb..c884907379 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -30,8 +30,11 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletionException;
 import java.util.function.Function;
+import org.apache.ignite.client.fakes.FakeCompute;
 import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.client.fakes.FakeIgniteTables;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.TableNotFoundException;
@@ -158,6 +161,22 @@ public class ClientComputeTest {
         }
     }
 
+    @Test
+    void testUnitsPropagation() throws Exception {
+        initServers(reqId -> false);
+
+        try (var client = getClient(server1)) {
+            Function<List<DeploymentUnit>, String> getUnits = units ->
+                    client.compute().<String>execute(getClusterNodes("s1"), 
units, FakeCompute.GET_UNITS).join();
+
+            assertEquals("", getUnits.apply(List.of()));
+            assertEquals("u1:1.2.3", getUnits.apply(List.of(new 
DeploymentUnit("u1", "1.2.3"))));
+            assertEquals(
+                    "u1:1.2.3,unit2:latest",
+                    getUnits.apply(List.of(new DeploymentUnit("u1", "1.2.3"), 
new DeploymentUnit("unit2", Version.LATEST))));
+        }
+    }
+
     private void initServers(Function<Integer, Boolean> shouldDropConnection) {
         ignite = new FakeIgnite();
         ((FakeIgniteTables) ignite.tables()).createTable(TABLE_NAME);
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index a49f40c60c..dc6b13b191 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -19,8 +19,10 @@ package org.apache.ignite.client.fakes;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.network.ClusterNode;
@@ -33,6 +35,8 @@ import org.jetbrains.annotations.Nullable;
  */
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class FakeCompute implements IgniteCompute {
+    public static final String GET_UNITS = "get-units";
+
     public static volatile @Nullable CompletableFuture future;
 
     private final String nodeName;
@@ -43,6 +47,11 @@ public class FakeCompute implements IgniteCompute {
 
     @Override
     public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, 
List<DeploymentUnit> units, String jobClassName, Object... args) {
+        if (Objects.equals(jobClassName, GET_UNITS)) {
+            String unitString = 
units.stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
+            return CompletableFuture.completedFuture((R) unitString);
+        }
+
         return future != null ? future : CompletableFuture.completedFuture((R) 
nodeName);
     }
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 9655cbc9e4..b7ba5c15b0 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -63,6 +63,7 @@ public class IgniteComputeImpl implements IgniteCompute {
     @Override
     public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, 
List<DeploymentUnit> units, String jobClassName, Object... args) {
         Objects.requireNonNull(nodes);
+        Objects.requireNonNull(units);
         Objects.requireNonNull(jobClassName);
 
         if (nodes.isEmpty()) {
@@ -111,6 +112,7 @@ public class IgniteComputeImpl implements IgniteCompute {
     ) {
         Objects.requireNonNull(tableName);
         Objects.requireNonNull(key);
+        Objects.requireNonNull(units);
         Objects.requireNonNull(jobClassName);
 
         return requiredTable(tableName)
@@ -131,6 +133,7 @@ public class IgniteComputeImpl implements IgniteCompute {
         Objects.requireNonNull(tableName);
         Objects.requireNonNull(key);
         Objects.requireNonNull(keyMapper);
+        Objects.requireNonNull(units);
         Objects.requireNonNull(jobClassName);
 
         return requiredTable(tableName)
@@ -150,15 +153,15 @@ public class IgniteComputeImpl implements IgniteCompute {
                 });
     }
 
-    private ClusterNode leaderOfTablePartitionByTupleKey(TableImpl table, 
Tuple key) {
+    private static ClusterNode leaderOfTablePartitionByTupleKey(TableImpl 
table, Tuple key) {
         return requiredLeaderByPartition(table, table.partition(key));
     }
 
-    private <K> ClusterNode leaderOfTablePartitionByMappedKey(TableImpl table, 
K key, Mapper<K> keyMapper) {
+    private static  <K> ClusterNode 
leaderOfTablePartitionByMappedKey(TableImpl table, K key, Mapper<K> keyMapper) {
         return requiredLeaderByPartition(table, table.partition(key, 
keyMapper));
     }
 
-    private ClusterNode requiredLeaderByPartition(TableImpl table, int 
partitionIndex) {
+    private static ClusterNode requiredLeaderByPartition(TableImpl table, int 
partitionIndex) {
         ClusterNode leaderNode = table.leaderAssignment(partitionIndex);
         if (leaderNode == null) {
             throw new IgniteInternalException("Leader not found for partition 
" + partitionIndex);
@@ -176,6 +179,7 @@ public class IgniteComputeImpl implements IgniteCompute {
             Object... args
     ) {
         Objects.requireNonNull(nodes);
+        Objects.requireNonNull(units);
         Objects.requireNonNull(jobClassName);
 
         return nodes.stream()
diff --git 
a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp 
b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
index 2af86535f2..7fe4fda76c 100644
--- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
@@ -70,6 +70,7 @@ void compute_impl::execute_on_one_node(cluster_node node, 
std::string_view job_c
 
     auto writer_func = [&node, job_class_name, args](protocol::writer &writer) 
{
         writer.write(node.get_name());
+        writer.write_nil(); // DeploymentUnits
         writer.write(job_class_name);
         write_primitives_as_binary_tuple(writer, args);
     };
@@ -109,6 +110,7 @@ void compute_impl::execute_colocated_async(std::string_view 
table_name, const ig
                         writer.write(table->get_id());
                         writer.write(sch.version);
                         write_tuple(writer, sch, key, true);
+                        writer.write_nil(); // DeploymentUnits
                         writer.write(job);
                         write_primitives_as_binary_tuple(writer, args);
                     };
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index e60cc5dce4..80754f052d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -140,6 +140,7 @@ namespace Apache.Ignite.Internal.Compute
                 var w = writer.MessageWriter;
 
                 w.Write(node.Name);
+                w.WriteNil(); // DeploymentUnits
                 w.Write(jobClassName);
                 w.WriteObjectCollectionAsBinaryTuple(args);
             }
@@ -218,6 +219,7 @@ namespace Apache.Ignite.Internal.Compute
                 var serializerHandler = serializerHandlerFunc(table);
                 var colocationHash = serializerHandler.Write(ref w, schema, 
key, keyOnly: true, computeHash: true);
 
+                w.WriteNil(); // DeploymentUnits
                 w.Write(jobClassName);
                 w.WriteObjectCollectionAsBinaryTuple(args);
 

Reply via email to