This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c56f077040 IGNITE-22124 Java thin: Implement Compute MapReduce API
(#3744)
c56f077040 is described below
commit c56f077040e736b087ad239c688f43897cc096f7
Author: Vadim Kolodin <[email protected]>
AuthorDate: Mon May 20 16:09:56 2024 +0300
IGNITE-22124 Java thin: Implement Compute MapReduce API (#3744)
---
.../internal/client/proto/ClientMessagePacker.java | 13 ++
.../client/proto/ClientMessageUnpacker.java | 10 ++
.../ignite/internal/client/proto/ClientOp.java | 5 +
.../handler/ClientInboundMessageHandler.java | 4 +
.../ClientComputeExecuteMapReduceRequest.java | 100 +++++++++++++++
.../apache/ignite/client/ClientOperationType.java | 5 +
.../org/apache/ignite/client/RetryReadPolicy.java | 1 +
.../apache/ignite/internal/client/ClientUtils.java | 3 +
.../internal/client/compute/ClientCompute.java | 65 ++++++++--
.../client/compute/ClientJobExecution.java | 14 +--
.../client/compute/ClientTaskExecution.java | 140 +++++++++++++++++++++
.../internal/client/compute/SubmitTaskResult.java | 41 ++++++
.../client/compute/task/ClientTaskExecution.java | 57 ---------
.../apache/ignite/client/ClientComputeTest.java | 47 +++++++
.../apache/ignite/client/fakes/FakeCompute.java | 58 ++++++++-
.../runner/app/client/ItThinClientComputeTest.java | 122 ++++++++++++++++++
16 files changed, 612 insertions(+), 73 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index 1fc36b24ce..1f07669394 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -520,6 +520,19 @@ public class ClientMessagePacker implements AutoCloseable {
buf.writeLongLE(val.getLeastSignificantBits());
}
+ /**
+ * Writes a UUID.
+ *
+ * @param val UUID value.
+ */
+ public void packUuidNullable(@Nullable UUID val) {
+ if (val == null) {
+ packNil();
+ } else {
+ packUuid(val);
+ }
+ }
+
/**
* Writes a bit set.
*
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
index cba6481c9c..ec8086fb11 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
@@ -672,6 +672,16 @@ public class ClientMessageUnpacker implements
AutoCloseable {
return new UUID(buf.readLongLE(), buf.readLongLE());
}
+ /**
+ * Reads a nullable UUID.
+ *
+ * @return UUID or null.
+ * @throws MessageTypeException when type is not UUID.
+ */
+ public @Nullable UUID unpackUuidNullable() {
+ return tryUnpackNil() ? null : unpackUuid();
+ }
+
/**
* Reads a bit set.
*
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index 0b83a4869f..bb371faaed 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -170,4 +170,9 @@ public class ClientOp {
/** Execute SQL query with the parameters batch. */
public static final int SQL_EXEC_BATCH = 63;
+
+ /**
+ * Execute MapReduce task.
+ */
+ public static final int COMPUTE_EXECUTE_MAPREDUCE = 64;
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 5669f4d285..a9a75b8405 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -45,6 +45,7 @@ import
org.apache.ignite.client.handler.requests.cluster.ClientClusterGetNodesRe
import
org.apache.ignite.client.handler.requests.compute.ClientComputeCancelRequest;
import
org.apache.ignite.client.handler.requests.compute.ClientComputeChangePriorityRequest;
import
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteColocatedRequest;
+import
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteMapReduceRequest;
import
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest;
import
org.apache.ignite.client.handler.requests.compute.ClientComputeGetStatusRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcCloseRequest;
@@ -735,6 +736,9 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
notificationSender(requestId)
);
+ case ClientOp.COMPUTE_EXECUTE_MAPREDUCE:
+ return ClientComputeExecuteMapReduceRequest.process(in, out,
compute, notificationSender(requestId));
+
case ClientOp.COMPUTE_GET_STATUS:
return ClientComputeGetStatusRequest.process(in, out, compute);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
new file mode 100644
index 0000000000..15ea62f430
--- /dev/null
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.compute;
+
+import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.unpackArgs;
+import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.unpackDeploymentUnits;
+import static
org.apache.ignite.client.handler.requests.compute.ClientComputeGetStatusRequest.packJobStatus;
+import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.NotificationSender;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.compute.IgniteComputeInternal;
+
+/**
+ * Compute MapReduce request.
+ */
+public class ClientComputeExecuteMapReduceRequest {
+ /**
+ * Processes the request.
+ *
+ * @param in Unpacker.
+ * @param out Packer.
+ * @param compute Compute.
+ * @param notificationSender Notification sender.
+ * @return Future.
+ */
+ public static CompletableFuture<Void> process(
+ ClientMessageUnpacker in,
+ ClientMessagePacker out,
+ IgniteComputeInternal compute,
+ NotificationSender notificationSender) {
+ List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in);
+ String taskClassName = in.unpackString();
+ Object[] args = unpackArgs(in);
+
+ TaskExecution<Object> execution =
compute.submitMapReduce(deploymentUnits, taskClassName, args);
+ sendTaskResult(execution, notificationSender);
+
+ var idsAsync = execution.idsAsync()
+ .handle((ids, ex) -> {
+ // empty ids in case of split exception to properly
respond with task id and failed status
+ return ex == null ? ids : Collections.<UUID>emptyList();
+ });
+
+ return execution.idAsync()
+ .thenAcceptBoth(idsAsync, (id, ids) -> {
+ out.packUuid(id);
+ packJobIds(out, ids);
+ });
+ }
+
+ static void packJobIds(ClientMessagePacker out, List<UUID> ids) {
+ out.packInt(ids.size());
+ for (var uuid : ids) {
+ out.packUuid(uuid);
+ }
+ }
+
+ static CompletableFuture<Object> sendTaskResult(TaskExecution<Object>
execution, NotificationSender notificationSender) {
+ return execution.resultAsync().whenComplete((val, err) ->
+ execution.statusAsync().whenComplete((status, errStatus) ->
+ execution.statusesAsync().whenComplete((statuses,
errStatuses) ->
+ notificationSender.sendNotification(w -> {
+ w.packObjectAsBinaryTuple(val);
+ packJobStatus(w, status);
+ packJobStatuses(w, statuses);
+ }, firstNotNull(err, errStatus, errStatuses)))
+ ));
+ }
+
+ static void packJobStatuses(ClientMessagePacker w, List<JobStatus>
statuses) {
+ w.packInt(statuses.size());
+ for (JobStatus status : statuses) {
+ packJobStatus(w, status);
+ }
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
index f62cecfc90..820e111de4 100644
---
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
+++
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -132,6 +132,11 @@ public enum ClientOperationType {
*/
COMPUTE_EXECUTE,
+ /**
+ * Compute Execute MapReduce ({@link
org.apache.ignite.compute.IgniteCompute#submitMapReduce(List, String,
Object...)}).
+ */
+ COMPUTE_EXECUTE_MAPREDUCE,
+
/**
* Get compute job status ({@link
org.apache.ignite.compute.JobExecution#statusAsync()}).
*/
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
index 482bc8a0e9..4912f2a2db 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
@@ -39,6 +39,7 @@ public class RetryReadPolicy extends RetryLimitPolicy {
case TUPLE_UPSERT:
case COMPUTE_EXECUTE:
+ case COMPUTE_EXECUTE_MAPREDUCE:
case COMPUTE_GET_STATUS:
case COMPUTE_CANCEL:
case COMPUTE_CHANGE_PRIORITY:
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index dde688c5ce..bc76849b24 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -215,6 +215,9 @@ public class ClientUtils {
case ClientOp.COMPUTE_EXECUTE_COLOCATED:
return ClientOperationType.COMPUTE_EXECUTE;
+ case ClientOp.COMPUTE_EXECUTE_MAPREDUCE:
+ return ClientOperationType.COMPUTE_EXECUTE_MAPREDUCE;
+
case ClientOp.COMPUTE_GET_STATUS:
return ClientOperationType.COMPUTE_GET_STATUS;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 63507b113a..efac50488b 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.client.compute;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.lang.ErrorGroups.Client.TABLE_ID_NOT_FOUND_ERR;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
@@ -251,8 +253,10 @@ public class ClientCompute implements IgniteCompute {
@Override
public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units,
String taskClassName, Object... args) {
- // TODO https://issues.apache.org/jira/browse/IGNITE-22124
- throw new UnsupportedOperationException("Not implemented yet.");
+ Objects.requireNonNull(units);
+ Objects.requireNonNull(taskClassName);
+
+ return new ClientTaskExecution<>(ch, doExecuteMapReduceAsync(units,
taskClassName, args));
}
@Override
@@ -260,6 +264,20 @@ public class ClientCompute implements IgniteCompute {
return sync(executeMapReduceAsync(units, taskClassName, args));
}
+ private CompletableFuture<SubmitTaskResult> doExecuteMapReduceAsync(
+ List<DeploymentUnit> units,
+ String taskClassName,
+ Object... args) {
+ return ch.serviceAsync(
+ ClientOp.COMPUTE_EXECUTE_MAPREDUCE,
+ w -> packTask(w.out(), units, taskClassName, args),
+ ClientCompute::unpackSubmitTaskResult,
+ null,
+ null,
+ true
+ );
+ }
+
private CompletableFuture<SubmitResult> executeOnNodesAsync(
Set<ClusterNode> nodes,
List<DeploymentUnit> units,
@@ -418,11 +436,7 @@ public class ClientCompute implements IgniteCompute {
String jobClassName,
JobExecutionOptions options,
Object[] args) {
- w.packInt(units.size());
- for (DeploymentUnit unit : units) {
- w.packString(unit.name());
- w.packString(unit.version().render());
- }
+ packDeploymentUnits(w, units);
w.packString(jobClassName);
w.packInt(options.priority());
@@ -430,6 +444,23 @@ public class ClientCompute implements IgniteCompute {
w.packObjectArrayAsBinaryTuple(args);
}
+ private static void packTask(ClientMessagePacker w,
+ List<DeploymentUnit> units,
+ String taskClassName,
+ Object[] args) {
+ packDeploymentUnits(w, units);
+ w.packString(taskClassName);
+ w.packObjectArrayAsBinaryTuple(args);
+ }
+
+ private static void packDeploymentUnits(ClientMessagePacker w,
List<DeploymentUnit> units) {
+ w.packInt(units.size());
+ for (DeploymentUnit unit : units) {
+ w.packString(unit.name());
+ w.packString(unit.version().render());
+ }
+ }
+
/**
* Unpacks job id from channel and gets notification future. This is
needed because we need to unpack message response in the payload
* reader because the unpacker will be closed after the response is
processed.
@@ -441,6 +472,26 @@ public class ClientCompute implements IgniteCompute {
return new SubmitResult(ch.in().unpackUuid(), ch.notificationFuture());
}
+ /**
+ * Unpacks coordination job id and jobs ids which are executing under this
task from channel and gets notification future.
+ * This is needed because we need to unpack message response in the payload
+ * reader because the unpacker will be closed after the response is
processed.
+ *
+ * @param ch Payload channel.
+ * @return Result of the task submission.
+ */
+ private static SubmitTaskResult unpackSubmitTaskResult(PayloadInputChannel
ch) {
+ var jobId = ch.in().unpackUuid();
+
+ var size = ch.in().unpackInt();
+ List<UUID> jobIds = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ jobIds.add(ch.in().unpackUuid());
+ }
+
+ return new SubmitTaskResult(jobId, jobIds, ch.notificationFuture());
+ }
+
private static <R> R sync(CompletableFuture<R> future) {
try {
return future.join();
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
index c95bd5e8ec..f4535aec1b 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
@@ -75,7 +75,7 @@ class ClientJobExecution<R> implements JobExecution<R> {
if (statusFuture.isDone()) {
return statusFuture;
}
- return jobIdFuture.thenCompose(this::getJobStatus);
+ return jobIdFuture.thenCompose(jobId -> getJobStatus(ch, jobId));
}
@Override
@@ -83,7 +83,7 @@ class ClientJobExecution<R> implements JobExecution<R> {
if (statusFuture.isDone()) {
return falseCompletedFuture();
}
- return jobIdFuture.thenCompose(this::cancelJob);
+ return jobIdFuture.thenCompose(jobId -> cancelJob(ch, jobId));
}
@Override
@@ -91,10 +91,10 @@ class ClientJobExecution<R> implements JobExecution<R> {
if (statusFuture.isDone()) {
return falseCompletedFuture();
}
- return jobIdFuture.thenCompose(jobId -> changePriority(jobId,
newPriority));
+ return jobIdFuture.thenCompose(jobId -> changePriority(ch, jobId,
newPriority));
}
- private CompletableFuture<@Nullable JobStatus> getJobStatus(UUID jobId) {
+ static CompletableFuture<@Nullable JobStatus> getJobStatus(ReliableChannel
ch, UUID jobId) {
// Send the request to any node, the request will be broadcast since
client doesn't know which particular node is running the job
// especially in case of colocated execution.
return ch.serviceAsync(
@@ -107,7 +107,7 @@ class ClientJobExecution<R> implements JobExecution<R> {
);
}
- private CompletableFuture<@Nullable Boolean> cancelJob(UUID jobId) {
+ static CompletableFuture<@Nullable Boolean> cancelJob(ReliableChannel ch,
UUID jobId) {
// Send the request to any node, the request will be broadcast since
client doesn't know which particular node is running the job
// especially in case of colocated execution.
return ch.serviceAsync(
@@ -120,7 +120,7 @@ class ClientJobExecution<R> implements JobExecution<R> {
);
}
- private CompletableFuture<@Nullable Boolean> changePriority(UUID jobId,
int newPriority) {
+ static CompletableFuture<@Nullable Boolean> changePriority(ReliableChannel
ch, UUID jobId, int newPriority) {
// Send the request to any node, the request will be broadcast since
client doesn't know which particular node is running the job
// especially in case of colocated execution.
return ch.serviceAsync(
@@ -136,7 +136,7 @@ class ClientJobExecution<R> implements JobExecution<R> {
);
}
- private static @Nullable JobStatus unpackJobStatus(PayloadInputChannel
payloadInputChannel) {
+ static @Nullable JobStatus unpackJobStatus(PayloadInputChannel
payloadInputChannel) {
ClientMessageUnpacker unpacker = payloadInputChannel.in();
if (unpacker.tryUnpackNil()) {
return null;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
new file mode 100644
index 0000000000..9df7d73b6a
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.compute;
+
+import static
org.apache.ignite.internal.client.compute.ClientJobExecution.cancelJob;
+import static
org.apache.ignite.internal.client.compute.ClientJobExecution.changePriority;
+import static
org.apache.ignite.internal.client.compute.ClientJobExecution.getJobStatus;
+import static
org.apache.ignite.internal.client.compute.ClientJobExecution.unpackJobStatus;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.client.PayloadInputChannel;
+import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client compute task implementation.
+ *
+ * @param <R> Task result type.
+ */
+class ClientTaskExecution<R> implements TaskExecution<R> {
+ private final ReliableChannel ch;
+
+ private final CompletableFuture<UUID> jobIdFuture;
+
+ private final CompletableFuture<List<@Nullable UUID>> jobIdsFuture;
+
+ private final CompletableFuture<R> resultAsync;
+
+ // Local status cache
+ private final CompletableFuture<@Nullable JobStatus> statusFuture = new
CompletableFuture<>();
+
+ // Local statuses cache
+ private final CompletableFuture<List<@Nullable JobStatus>> statusesFutures
= new CompletableFuture<>();
+
+ ClientTaskExecution(ReliableChannel ch,
CompletableFuture<SubmitTaskResult> reqFuture) {
+ this.ch = ch;
+
+ jobIdFuture = reqFuture.thenApply(SubmitTaskResult::jobId);
+ jobIdsFuture = reqFuture.thenApply(SubmitTaskResult::jobIds);
+
+ resultAsync = reqFuture
+ .thenCompose(SubmitTaskResult::notificationFuture)
+ .thenApply(payloadInputChannel -> {
+ // Notifications require explicit input close.
+ try (payloadInputChannel) {
+ R result = (R)
payloadInputChannel.in().unpackObjectFromBinaryTuple();
+
statusFuture.complete(unpackJobStatus(payloadInputChannel));
+
statusesFutures.complete(unpackJobStatuses(payloadInputChannel));
+ return result;
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<R> resultAsync() {
+ return resultAsync;
+ }
+
+ @Override
+ public CompletableFuture<@Nullable JobStatus> statusAsync() {
+ if (statusFuture.isDone()) {
+ return statusFuture;
+ }
+ return jobIdFuture.thenCompose(jobId -> getJobStatus(ch, jobId));
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> cancelAsync() {
+ if (statusFuture.isDone()) {
+ return falseCompletedFuture();
+ }
+ return jobIdFuture.thenCompose(jobId -> cancelJob(ch, jobId));
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
+ if (statusFuture.isDone()) {
+ return falseCompletedFuture();
+ }
+ return jobIdFuture.thenCompose(jobId -> changePriority(ch, jobId,
newPriority));
+ }
+
+ @Override
+ public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+ if (statusesFutures.isDone()) {
+ return statusesFutures;
+ }
+
+ return jobIdsFuture.thenCompose(ids -> {
+ @SuppressWarnings("unchecked")
+ CompletableFuture<@Nullable JobStatus>[] futures = ids.stream()
+ .map(jobId -> getJobStatus(ch, jobId))
+ .toArray(CompletableFuture[]::new);
+
+ return CompletableFutures.allOf(futures)
+ .thenApply(Function.identity());
+ });
+ }
+
+ private static List<@Nullable JobStatus>
unpackJobStatuses(PayloadInputChannel payloadInputChannel) {
+ var unpacker = payloadInputChannel.in();
+ var size = unpacker.unpackInt();
+
+ if (size == 0) {
+ return Collections.emptyList();
+ }
+
+ var statuses = new ArrayList<@Nullable JobStatus>(size);
+ for (int i = 0; i < size; i++) {
+ var status = unpackJobStatus(payloadInputChannel);
+ statuses.add(status);
+ }
+
+ return Collections.unmodifiableList(statuses);
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/SubmitTaskResult.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/SubmitTaskResult.java
new file mode 100644
index 0000000000..71ef494201
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/SubmitTaskResult.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.compute;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.PayloadInputChannel;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of the task submission.
+ * Contains unpacked job id, a collection of ids of the jobs which are
executing under this task, and notification future.
+ */
+class SubmitTaskResult extends SubmitResult {
+ private final List<UUID> jobIds;
+
+ SubmitTaskResult(UUID jobId, List<UUID> jobIds,
CompletableFuture<PayloadInputChannel> notificationFuture) {
+ super(jobId, notificationFuture);
+ this.jobIds = jobIds;
+ }
+
+ List<@Nullable UUID> jobIds() {
+ return jobIds;
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java
deleted file mode 100644
index dd1b766cef..0000000000
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.client.compute.task;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.TaskExecution;
-import org.jetbrains.annotations.Nullable;
-
-// TODO https://issues.apache.org/jira/browse/IGNITE-22124
-/**
- * Client compute task implementation.
- *
- * @param <R> Task result type.
- */
-public class ClientTaskExecution<R> implements TaskExecution<R> {
- @Override
- public CompletableFuture<R> resultAsync() {
- throw new UnsupportedOperationException("Not implemented yet.");
- }
-
- @Override
- public CompletableFuture<@Nullable JobStatus> statusAsync() {
- throw new UnsupportedOperationException("Not implemented yet.");
- }
-
- @Override
- public CompletableFuture<@Nullable Boolean> cancelAsync() {
- throw new UnsupportedOperationException("Not implemented yet.");
- }
-
- @Override
- public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
- throw new UnsupportedOperationException("Not implemented yet.");
- }
-
- @Override
- public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
- throw new UnsupportedOperationException("Not implemented yet.");
- }
-}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index 6a58de7d9f..6fb03e47da 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -28,7 +28,10 @@ import static
org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.lang.ErrorGroups.Table.TABLE_NOT_FOUND_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -45,6 +48,7 @@ import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteTables;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.TaskExecution;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.client.table.ClientTable;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -216,6 +220,49 @@ public class ClientComputeTest extends
BaseIgniteAbstractTest {
}
}
+ @Test
+ void testMapReduceExecute() throws Exception {
+ initServers(reqId -> false);
+
+ try (var client = getClient(server1)) {
+ Object[] args = {"arg1", 2};
+ String res1 = client.compute().executeMapReduce(List.of(), "job",
args);
+ assertEquals("s1", res1);
+ }
+ }
+
+ @Test
+ void testMapReduceSubmit() throws Exception {
+ initServers(reqId -> false);
+
+ try (var client = getClient(server1)) {
+ TaskExecution<Object> task =
client.compute().submitMapReduce(List.of(), "job");
+
+ assertThat(task.resultAsync(), willBe("s1"));
+
+ assertThat(task.statusAsync(),
willBe(jobStatusWithState(COMPLETED)));
+ assertThat(task.statusesAsync(),
willBe(everyItem(jobStatusWithState(COMPLETED))));
+
+ assertThat("compute task and sub tasks ids must be different",
+ task.idsAsync(),
willBe(not(hasItem(task.idAsync().get()))));
+ }
+ }
+
+ @Test
+ void testMapReduceException() throws Exception {
+ initServers(reqId -> false);
+
+ try (var client = getClient(server1)) {
+ FakeCompute.future = CompletableFuture.failedFuture(new
RuntimeException("job failed"));
+
+ TaskExecution<Object> execution =
client.compute().submitMapReduce(List.of(), "job");
+
+ assertThat(execution.resultAsync(),
willThrowFast(IgniteException.class));
+ assertThat(execution.statusAsync(),
willBe(jobStatusWithState(FAILED)));
+ assertThat(execution.statusesAsync(),
willBe(everyItem(jobStatusWithState(FAILED))));
+ }
+ }
+
@Test
void testUnitsPropagation() throws Exception {
initServers(reqId -> false);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 3d5c9d79d0..6dc1c72a07 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.IgniteCompute;
@@ -190,7 +191,7 @@ public class FakeCompute implements IgniteComputeInternal {
@Override
public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units,
String taskClassName, Object... args) {
- return null;
+ return taskExecution(future != null ? future : completedFuture((R)
nodeName));
}
@Override
@@ -215,7 +216,7 @@ public class FakeCompute implements IgniteComputeInternal {
result.whenComplete((r, throwable) -> {
JobState state = throwable != null ? FAILED : COMPLETED;
- JobStatus newStatus =
status.toBuilder().state(state).finishTime(Instant.now()).build();
+ JobStatus newStatus =
status.toBuilder().id(jobId).state(state).finishTime(Instant.now()).build();
statuses.put(jobId, newStatus);
});
return new JobExecution<>() {
@@ -241,6 +242,59 @@ public class FakeCompute implements IgniteComputeInternal {
};
}
+ private <R> TaskExecution<R> taskExecution(CompletableFuture<R> result) {
+ BiFunction<UUID, JobState, JobStatus> toStatus = (id, jobState) ->
+ JobStatus.builder()
+ .id(id)
+ .state(jobState)
+ .createTime(Instant.now())
+ .startTime(Instant.now())
+ .build();
+
+ UUID jobId = UUID.randomUUID();
+ UUID subJobId1 = UUID.randomUUID();
+ UUID subJobId2 = UUID.randomUUID();
+
+ statuses.put(jobId, toStatus.apply(jobId, EXECUTING));
+ statuses.put(subJobId1, toStatus.apply(subJobId1, EXECUTING));
+ statuses.put(subJobId2, toStatus.apply(subJobId2, EXECUTING));
+
+ result.whenComplete((r, throwable) -> {
+ JobState state = throwable != null ? FAILED : COMPLETED;
+
+ statuses.put(jobId, toStatus.apply(jobId, state));
+ statuses.put(subJobId1, toStatus.apply(subJobId1, state));
+ statuses.put(subJobId2, toStatus.apply(subJobId2, state));
+ });
+
+ return new TaskExecution<>() {
+ @Override
+ public CompletableFuture<R> resultAsync() {
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<@Nullable JobStatus> statusAsync() {
+ return completedFuture(statuses.get(jobId));
+ }
+
+ @Override
+ public CompletableFuture<List<@Nullable JobStatus>>
statusesAsync() {
+ return completedFuture(List.of(statuses.get(subJobId1),
statuses.get(subJobId2)));
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> cancelAsync() {
+ return trueCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean>
changePriorityAsync(int newPriority) {
+ return trueCompletedFuture();
+ }
+ };
+ }
+
@Override
public CompletableFuture<Collection<JobStatus>> statusesAsync() {
return completedFuture(statuses.values());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 1e7bb01e2e..ca2e22cf2d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -32,8 +32,12 @@ import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.oneOf;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -70,13 +74,20 @@ import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
/**
* Thin client compute integration test.
@@ -668,6 +679,46 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
assertEquals(expected, res);
}
+ @Test
+ void testExecuteMapReduce() throws Exception {
+ TaskExecution<String> execution =
client().compute().submitMapReduce(List.of(),
MapReduceNodeNameTask.class.getName());
+
+ List<Matcher<? super String>> nodeNames = sortedNodes().stream()
+ .map(ClusterNode::name)
+ .map(Matchers::containsString)
+ .collect(Collectors.toList());
+ assertThat(execution.resultAsync(), willBe(allOf(nodeNames)));
+
+ assertThat(execution.statusAsync(),
willBe(jobStatusWithState(COMPLETED)));
+ assertThat(execution.statusesAsync(),
willBe(everyItem(jobStatusWithState(COMPLETED))));
+
+ assertThat("compute task and sub tasks ids must be different",
+ execution.idsAsync(),
willBe(not(hasItem(execution.idAsync().get()))));
+ }
+
+ @Test
+ void testExecuteMapReduceWithArgs() {
+ TaskExecution<String> execution = client().compute()
+ .submitMapReduce(List.of(), MapReduceArgsTask.class.getName(),
1, "2", 3.3);
+
+ assertThat(execution.resultAsync(), willBe(containsString("1_2_3.3")));
+ assertThat(execution.statusAsync(),
willBe(jobStatusWithState(COMPLETED)));
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {MapReduceExceptionOnSplitTask.class,
MapReduceExceptionOnReduceTask.class})
+ void testExecuteMapReduceExceptionPropagation(Class<?> taskClass) {
+ IgniteException cause = getExceptionInJobExecutionAsync(
+ client().compute().submitMapReduce(List.of(),
taskClass.getName())
+ );
+
+ assertThat(cause.getMessage(), containsString("Custom job error"));
+ assertEquals(TRACE_ID, cause.traceId());
+ assertEquals(COLUMN_ALREADY_EXISTS_ERR, cause.code());
+ assertInstanceOf(CustomException.class, cause);
+ assertNull(cause.getCause()); // No stack trace by default.
+ }
+
private void testEchoArg(Object arg) {
Object res = client().compute().execute(Set.of(node(0)), List.of(),
EchoJob.class.getName(), arg, arg.toString());
@@ -755,6 +806,77 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
}
}
+ private static class MapReduceNodeNameTask implements
MapReduceTask<String> {
+ @Override
+ public List<ComputeJobRunner> split(TaskExecutionContext context,
Object... args) {
+ return context.ignite().clusterNodes().stream()
+ .map(node -> ComputeJobRunner.builder()
+ .jobClassName(NodeNameJob.class.getName())
+ .nodes(Set.of(node))
+ .args(args)
+ .build())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public String reduce(Map<UUID, ?> results) {
+ return results.values().stream()
+ .map(String.class::cast)
+ .collect(Collectors.joining(","));
+ }
+ }
+
+ private static class MapReduceArgsTask implements MapReduceTask<String> {
+ @Override
+ public List<ComputeJobRunner> split(TaskExecutionContext context,
Object... args) {
+ return context.ignite().clusterNodes().stream()
+ .map(node -> ComputeJobRunner.builder()
+ .jobClassName(ConcatJob.class.getName())
+ .nodes(Set.of(node))
+ .args(args)
+ .build())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public String reduce(Map<UUID, ?> results) {
+ return results.values().stream()
+ .map(String.class::cast)
+ .collect(Collectors.joining(","));
+ }
+ }
+
+ private static class MapReduceExceptionOnSplitTask implements
MapReduceTask<String> {
+ @Override
+ public List<ComputeJobRunner> split(TaskExecutionContext context,
Object... args) {
+ throw new CustomException(TRACE_ID, COLUMN_ALREADY_EXISTS_ERR,
"Custom job error", null);
+ }
+
+ @Override
+ public String reduce(Map<UUID, ?> results) {
+ return "expected split exception";
+ }
+ }
+
+ private static class MapReduceExceptionOnReduceTask implements
MapReduceTask<String> {
+
+ @Override
+ public List<ComputeJobRunner> split(TaskExecutionContext context,
Object... args) {
+ return context.ignite().clusterNodes().stream()
+ .map(node -> ComputeJobRunner.builder()
+ .jobClassName(NodeNameJob.class.getName())
+ .nodes(Set.of(node))
+ .args(args)
+ .build())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public String reduce(Map<UUID, ?> results) {
+ throw new CustomException(TRACE_ID, COLUMN_ALREADY_EXISTS_ERR,
"Custom job error", null);
+ }
+ }
+
/**
* Custom public exception class.
*/