This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0fc99e9a4a IGNITE-19624 Java client: propagate compute deployment
units (#2230)
0fc99e9a4a is described below
commit 0fc99e9a4a788a3f7bbdb64dec70f641dac6a421
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Jun 22 14:01:39 2023 +0300
IGNITE-19624 Java client: propagate compute deployment units (#2230)
---
.../ClientComputeExecuteColocatedRequest.java | 5 ++-
.../compute/ClientComputeExecuteRequest.java | 22 ++++++++++++-
.../internal/client/compute/ClientCompute.java | 36 +++++++++++++++-------
.../apache/ignite/client/ClientComputeTest.java | 19 ++++++++++++
.../apache/ignite/client/fakes/FakeCompute.java | 9 ++++++
.../ignite/internal/compute/IgniteComputeImpl.java | 10 ++++--
.../ignite/client/detail/compute/compute_impl.cpp | 2 ++
.../Apache.Ignite/Internal/Compute/Compute.cs | 2 ++
8 files changed, 89 insertions(+), 16 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
index 65a3b75e5d..97bb453404 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
@@ -18,11 +18,13 @@
package org.apache.ignite.client.handler.requests.compute;
import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.unpackArgs;
+import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.unpackDeploymentUnits;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -49,10 +51,11 @@ public class ClientComputeExecuteColocatedRequest {
var table = readTable(in, tables);
var keyTuple = readTuple(in, table, true);
+ List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in);
String jobClassName = in.unpackString();
Object[] args = unpackArgs(in);
- return compute.executeColocated(table.name(), keyTuple, List.of(),
jobClassName, args).thenAccept(val -> {
+ return compute.executeColocated(table.name(), keyTuple,
deploymentUnits, jobClassName, args).thenAccept(val -> {
out.packInt(table.schemaView().lastSchemaVersion());
out.packObjectAsBinaryTuple(val);
});
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 8a94c70e98..3b502039b7 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -17,9 +17,11 @@
package org.apache.ignite.client.handler.requests.compute;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -54,11 +56,12 @@ public class ClientComputeExecuteRequest {
throw new IgniteException("Specified node is not present in the
cluster: " + nodeName);
}
+ List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in);
String jobClassName = in.unpackString();
Object[] args = unpackArgs(in);
- return compute.execute(Set.of(node), List.of(), jobClassName,
args).thenAccept(out::packObjectAsBinaryTuple);
+ return compute.execute(Set.of(node), deploymentUnits, jobClassName,
args).thenAccept(out::packObjectAsBinaryTuple);
}
/**
@@ -70,4 +73,21 @@ public class ClientComputeExecuteRequest {
static Object[] unpackArgs(ClientMessageUnpacker in) {
return in.unpackObjectArrayFromBinaryTuple();
}
+
+ /**
+ * Unpacks deployment units.
+ *
+ * @param in Unpacker.
+ * @return Deployment units.
+ */
+ static List<DeploymentUnit> unpackDeploymentUnits(ClientMessageUnpacker
in) {
+ int size = in.tryUnpackNil() ? 0 : in.unpackArrayHeader();
+ List<DeploymentUnit> res = new ArrayList<>(size);
+
+ for (int i = 0; i < size; i++) {
+ res.add(new DeploymentUnit(in.unpackString(), in.unpackString()));
+ }
+
+ return res;
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index c6cac364a4..db1e029516 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -78,6 +78,7 @@ public class ClientCompute implements IgniteCompute {
@Override
public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes,
List<DeploymentUnit> units, String jobClassName, Object... args) {
Objects.requireNonNull(nodes);
+ Objects.requireNonNull(units);
Objects.requireNonNull(jobClassName);
if (nodes.isEmpty()) {
@@ -86,7 +87,7 @@ public class ClientCompute implements IgniteCompute {
ClusterNode node = randomNode(nodes);
- return executeOnOneNode(node, jobClassName, args);
+ return executeOnOneNode(node, units, jobClassName, args);
}
/** {@inheritDoc} */
@@ -100,10 +101,11 @@ public class ClientCompute implements IgniteCompute {
) {
Objects.requireNonNull(tableName);
Objects.requireNonNull(key);
+ Objects.requireNonNull(units);
Objects.requireNonNull(jobClassName);
return getTable(tableName)
- .thenCompose(table -> (CompletableFuture<R>)
executeColocatedTupleKey(table, key, jobClassName, args))
+ .thenCompose(table -> (CompletableFuture<R>)
executeColocatedTupleKey(table, key, units, jobClassName, args))
.handle((res, err) -> handleMissingTable(tableName, res, err))
.thenCompose(r ->
// If a table was dropped, try again: maybe a new
table was created with the same name and new id.
@@ -125,10 +127,11 @@ public class ClientCompute implements IgniteCompute {
Objects.requireNonNull(tableName);
Objects.requireNonNull(key);
Objects.requireNonNull(keyMapper);
+ Objects.requireNonNull(units);
Objects.requireNonNull(jobClassName);
return getTable(tableName)
- .thenCompose(table -> (CompletableFuture<R>)
executeColocatedObjectKey(table, key, keyMapper, jobClassName, args))
+ .thenCompose(table -> (CompletableFuture<R>)
executeColocatedObjectKey(table, key, keyMapper, units, jobClassName, args))
.handle((res, err) -> handleMissingTable(tableName, res, err))
.thenCompose(r ->
// If a table was dropped, try again: maybe a new
table was created with the same name and new id.
@@ -146,12 +149,13 @@ public class ClientCompute implements IgniteCompute {
Object... args
) {
Objects.requireNonNull(nodes);
+ Objects.requireNonNull(units);
Objects.requireNonNull(jobClassName);
Map<ClusterNode, CompletableFuture<R>> map = new
HashMap<>(nodes.size());
for (ClusterNode node : nodes) {
- if (map.put(node, executeOnOneNode(node, jobClassName, args)) !=
null) {
+ if (map.put(node, executeOnOneNode(node, units, jobClassName,
args)) != null) {
throw new IllegalStateException("Node can't be specified more
than once: " + node);
}
}
@@ -159,7 +163,7 @@ public class ClientCompute implements IgniteCompute {
return map;
}
- private <R> CompletableFuture<R> executeOnOneNode(ClusterNode node, String
jobClassName, Object[] args) {
+ private <R> CompletableFuture<R> executeOnOneNode(ClusterNode node,
List<DeploymentUnit> units, String jobClassName, Object[] args) {
return ch.serviceAsync(ClientOp.COMPUTE_EXECUTE, w -> {
if
(w.clientChannel().protocolContext().clusterNode().name().equals(node.name())) {
w.out().packNil();
@@ -167,8 +171,7 @@ public class ClientCompute implements IgniteCompute {
w.out().packString(node.name());
}
- w.out().packString(jobClassName);
- w.out().packObjectArrayAsBinaryTuple(args);
+ packJob(w.out(), units, jobClassName, args);
}, r -> (R) r.in().unpackObjectFromBinaryTuple(), node.name(), null,
null);
}
@@ -191,6 +194,7 @@ public class ClientCompute implements IgniteCompute {
ClientTable t,
K key,
Mapper<K> keyMapper,
+ List<DeploymentUnit> units,
String jobClassName,
Object[] args) {
return t.doSchemaOutOpAsync(
@@ -203,8 +207,7 @@ public class ClientCompute implements IgniteCompute {
ClientRecordSerializer.writeRecRaw(key, keyMapper, schema,
w, TuplePart.KEY);
- w.packString(jobClassName);
- w.packObjectArrayAsBinaryTuple(args);
+ packJob(w, units, jobClassName, args);
},
r -> (R) r.unpackObjectFromBinaryTuple(),
ClientTupleSerializer.getPartitionAwarenessProvider(null,
keyMapper, key));
@@ -213,6 +216,7 @@ public class ClientCompute implements IgniteCompute {
private static <R> CompletableFuture<R> executeColocatedTupleKey(
ClientTable t,
Tuple key,
+ List<DeploymentUnit> units,
String jobClassName,
Object[] args) {
return t.doSchemaOutOpAsync(
@@ -225,8 +229,7 @@ public class ClientCompute implements IgniteCompute {
ClientTupleSerializer.writeTupleRaw(key, schema,
outputChannel, true);
- w.packString(jobClassName);
- w.packObjectArrayAsBinaryTuple(args);
+ packJob(w, units, jobClassName, args);
},
r -> (R) r.unpackObjectFromBinaryTuple(),
ClientTupleSerializer.getPartitionAwarenessProvider(null,
key));
@@ -274,4 +277,15 @@ public class ClientCompute implements IgniteCompute {
return res;
}
+
+ private static void packJob(ClientMessagePacker w, List<DeploymentUnit>
units, String jobClassName, Object[] args) {
+ w.packArrayHeader(units.size());
+ for (DeploymentUnit unit : units) {
+ w.packString(unit.name());
+ w.packString(unit.version().render());
+ }
+
+ w.packString(jobClassName);
+ w.packObjectArrayAsBinaryTuple(args);
+ }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index 3d811918cb..c884907379 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -30,8 +30,11 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
+import org.apache.ignite.client.fakes.FakeCompute;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteTables;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.TableNotFoundException;
@@ -158,6 +161,22 @@ public class ClientComputeTest {
}
}
+ @Test
+ void testUnitsPropagation() throws Exception {
+ initServers(reqId -> false);
+
+ try (var client = getClient(server1)) {
+ Function<List<DeploymentUnit>, String> getUnits = units ->
+ client.compute().<String>execute(getClusterNodes("s1"),
units, FakeCompute.GET_UNITS).join();
+
+ assertEquals("", getUnits.apply(List.of()));
+ assertEquals("u1:1.2.3", getUnits.apply(List.of(new
DeploymentUnit("u1", "1.2.3"))));
+ assertEquals(
+ "u1:1.2.3,unit2:latest",
+ getUnits.apply(List.of(new DeploymentUnit("u1", "1.2.3"),
new DeploymentUnit("unit2", Version.LATEST))));
+ }
+ }
+
private void initServers(Function<Integer, Boolean> shouldDropConnection) {
ignite = new FakeIgnite();
((FakeIgniteTables) ignite.tables()).createTable(TABLE_NAME);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index a49f40c60c..dc6b13b191 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -19,8 +19,10 @@ package org.apache.ignite.client.fakes;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.network.ClusterNode;
@@ -33,6 +35,8 @@ import org.jetbrains.annotations.Nullable;
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class FakeCompute implements IgniteCompute {
+ public static final String GET_UNITS = "get-units";
+
public static volatile @Nullable CompletableFuture future;
private final String nodeName;
@@ -43,6 +47,11 @@ public class FakeCompute implements IgniteCompute {
@Override
public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes,
List<DeploymentUnit> units, String jobClassName, Object... args) {
+ if (Objects.equals(jobClassName, GET_UNITS)) {
+ String unitString =
units.stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
+ return CompletableFuture.completedFuture((R) unitString);
+ }
+
return future != null ? future : CompletableFuture.completedFuture((R)
nodeName);
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 9655cbc9e4..b7ba5c15b0 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -63,6 +63,7 @@ public class IgniteComputeImpl implements IgniteCompute {
@Override
public <R> CompletableFuture<R> execute(Set<ClusterNode> nodes,
List<DeploymentUnit> units, String jobClassName, Object... args) {
Objects.requireNonNull(nodes);
+ Objects.requireNonNull(units);
Objects.requireNonNull(jobClassName);
if (nodes.isEmpty()) {
@@ -111,6 +112,7 @@ public class IgniteComputeImpl implements IgniteCompute {
) {
Objects.requireNonNull(tableName);
Objects.requireNonNull(key);
+ Objects.requireNonNull(units);
Objects.requireNonNull(jobClassName);
return requiredTable(tableName)
@@ -131,6 +133,7 @@ public class IgniteComputeImpl implements IgniteCompute {
Objects.requireNonNull(tableName);
Objects.requireNonNull(key);
Objects.requireNonNull(keyMapper);
+ Objects.requireNonNull(units);
Objects.requireNonNull(jobClassName);
return requiredTable(tableName)
@@ -150,15 +153,15 @@ public class IgniteComputeImpl implements IgniteCompute {
});
}
- private ClusterNode leaderOfTablePartitionByTupleKey(TableImpl table,
Tuple key) {
+ private static ClusterNode leaderOfTablePartitionByTupleKey(TableImpl
table, Tuple key) {
return requiredLeaderByPartition(table, table.partition(key));
}
- private <K> ClusterNode leaderOfTablePartitionByMappedKey(TableImpl table,
K key, Mapper<K> keyMapper) {
+ private static <K> ClusterNode
leaderOfTablePartitionByMappedKey(TableImpl table, K key, Mapper<K> keyMapper) {
return requiredLeaderByPartition(table, table.partition(key,
keyMapper));
}
- private ClusterNode requiredLeaderByPartition(TableImpl table, int
partitionIndex) {
+ private static ClusterNode requiredLeaderByPartition(TableImpl table, int
partitionIndex) {
ClusterNode leaderNode = table.leaderAssignment(partitionIndex);
if (leaderNode == null) {
throw new IgniteInternalException("Leader not found for partition
" + partitionIndex);
@@ -176,6 +179,7 @@ public class IgniteComputeImpl implements IgniteCompute {
Object... args
) {
Objects.requireNonNull(nodes);
+ Objects.requireNonNull(units);
Objects.requireNonNull(jobClassName);
return nodes.stream()
diff --git
a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
index 2af86535f2..7fe4fda76c 100644
--- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
@@ -70,6 +70,7 @@ void compute_impl::execute_on_one_node(cluster_node node,
std::string_view job_c
auto writer_func = [&node, job_class_name, args](protocol::writer &writer)
{
writer.write(node.get_name());
+ writer.write_nil(); // DeploymentUnits
writer.write(job_class_name);
write_primitives_as_binary_tuple(writer, args);
};
@@ -109,6 +110,7 @@ void compute_impl::execute_colocated_async(std::string_view
table_name, const ig
writer.write(table->get_id());
writer.write(sch.version);
write_tuple(writer, sch, key, true);
+ writer.write_nil(); // DeploymentUnits
writer.write(job);
write_primitives_as_binary_tuple(writer, args);
};
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index e60cc5dce4..80754f052d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -140,6 +140,7 @@ namespace Apache.Ignite.Internal.Compute
var w = writer.MessageWriter;
w.Write(node.Name);
+ w.WriteNil(); // DeploymentUnits
w.Write(jobClassName);
w.WriteObjectCollectionAsBinaryTuple(args);
}
@@ -218,6 +219,7 @@ namespace Apache.Ignite.Internal.Compute
var serializerHandler = serializerHandlerFunc(table);
var colocationHash = serializerHandler.Write(ref w, schema,
key, keyOnly: true, computeHash: true);
+ w.WriteNil(); // DeploymentUnits
w.Write(jobClassName);
w.WriteObjectCollectionAsBinaryTuple(args);