This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 01b2306c26 IGNITE-22787 Support Marshallers in MapReduce API (#4260)
01b2306c26 is described below
commit 01b2306c26cda6d5f40dcd03f186b9da18e90214
Author: Aleksandr Pakhomov <[email protected]>
AuthorDate: Thu Aug 29 21:53:40 2024 +0300
IGNITE-22787 Support Marshallers in MapReduce API (#4260)
---
.../org/apache/ignite/compute/TaskDescriptor.java | 57 +++++++++++++++-
.../apache/ignite/compute/task/MapReduceTask.java | 11 ++++
.../ClientComputeExecuteMapReduceRequest.java | 9 ++-
.../internal/client/compute/ClientCompute.java | 32 +++++----
.../client/compute/ClientTaskExecution.java | 6 +-
.../apache/ignite/client/fakes/FakeCompute.java | 76 ++++++++++++++--------
.../internal/compute/TaskExecutionWrapper.java | 12 +++-
.../compute/task/DelegatingTaskExecution.java | 11 +++-
.../compute/task/TaskExecutionInternal.java | 16 ++++-
.../Apache.Ignite/Internal/Compute/Compute.cs | 3 +-
.../apache/ignite/internal/runner/app/Jobs.java | 62 +++++++++++++++++-
.../client/ItThinClientComputeMarshallingTest.java | 55 ++++++++++++----
12 files changed, 284 insertions(+), 66 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
b/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
index 511cd0312f..19e4c1f304 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/TaskDescriptor.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Objects;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.marshalling.Marshaller;
/**
* Compute task descriptor.
@@ -30,12 +31,20 @@ public class TaskDescriptor<T, R> {
private final List<DeploymentUnit> units;
+ private final Marshaller<T, byte[]> splitJobArgumentMarshaller;
+
+ private final Marshaller<R, byte[]> reduceJobResultMarshaller;
+
private TaskDescriptor(
String taskClassName,
- List<DeploymentUnit> units
+ List<DeploymentUnit> units,
+ Marshaller<T, byte[]> splitJobArgumentMarshaller,
+ Marshaller<R, byte[]> reduceJobResultMarshaller
) {
this.taskClassName = taskClassName;
this.units = units;
+ this.splitJobArgumentMarshaller = splitJobArgumentMarshaller;
+ this.reduceJobResultMarshaller = reduceJobResultMarshaller;
}
/**
@@ -56,6 +65,24 @@ public class TaskDescriptor<T, R> {
return units;
}
+ /**
+ * Marshaller for split job argument.
+ *
+ * @return Marshaller for split job argument.
+ */
+ public Marshaller<T, byte[]> splitJobArgumentMarshaller() {
+ return splitJobArgumentMarshaller;
+ }
+
+ /**
+ * Marshaller for reduce job result.
+ *
+ * @return Marshaller for reduce job result.
+ */
+ public Marshaller<R, byte[]> reduceJobResultMarshaller() {
+ return reduceJobResultMarshaller;
+ }
+
/**
* Create a new builder.
*
@@ -84,6 +111,8 @@ public class TaskDescriptor<T, R> {
public static class Builder<T, R> {
private final String taskClassName;
private List<DeploymentUnit> units;
+ private Marshaller<T, byte[]> splitJobArgumentMarshaller;
+ private Marshaller<R, byte[]> reduceJobResultMarshaller;
private Builder(String taskClassName) {
Objects.requireNonNull(taskClassName);
@@ -113,6 +142,27 @@ public class TaskDescriptor<T, R> {
return this;
}
+ /**
+ * Sets the marshaller for split job argument.
+ *
+ * @param splitJobArgumentMarshaller Marshaller for split job argument.
+ * @return This builder.
+ */
+ public Builder<T, R> splitJobArgumentMarshaller(Marshaller<T, byte[]>
splitJobArgumentMarshaller) {
+ this.splitJobArgumentMarshaller = splitJobArgumentMarshaller;
+ return this;
+ }
+
+ /**
+ * Sets the marshaller for reduce job result.
+ *
+ * @param reduceJobResultMarshaller Marshaller for reduce job result.
+ * @return This builder.
+ */
+ public Builder<T, R> reduceJobArgumentMarshaller(Marshaller<R, byte[]>
reduceJobResultMarshaller) {
+ this.reduceJobResultMarshaller = reduceJobResultMarshaller;
+ return this;
+ }
/**
* Builds the task descriptor.
@@ -122,8 +172,11 @@ public class TaskDescriptor<T, R> {
public TaskDescriptor<T, R> build() {
return new TaskDescriptor<>(
taskClassName,
- units == null ? List.of() : units
+ units == null ? List.of() : units,
+ splitJobArgumentMarshaller,
+ reduceJobResultMarshaller
);
}
+
}
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
index 69fea09a8b..c824ba6505 100644
---
a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
+++
b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
/**
@@ -53,4 +54,14 @@ public interface MapReduceTask<I, M, T, R> {
* @return Final task result future.
*/
CompletableFuture<R> reduceAsync(TaskExecutionContext taskContext,
Map<UUID, T> results);
+
+ /** The marshaller that is called to unmarshal split job argument if not
null. */
+ default @Nullable Marshaller<I, byte[]> splitJobInputMarshaller() {
+ return null;
+ }
+
+ /** The marshaller that is called to marshal reduce job result if not
null. */
+ default @Nullable Marshaller<R, byte[]> reduceJobResultMarshaller() {
+ return null;
+ }
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index b7bf2d4090..306e5ac1db 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -31,9 +31,12 @@ import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.internal.client.proto.ClientComputeJobPacker;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
+import org.apache.ignite.internal.compute.MarshallerProvider;
+import org.apache.ignite.marshalling.Marshaller;
/**
* Compute MapReduce request.
@@ -82,11 +85,13 @@ public class ClientComputeExecuteMapReduceRequest {
}
static CompletableFuture<Object> sendTaskResult(TaskExecution<Object>
execution, NotificationSender notificationSender) {
+ TaskExecution<Object> t = execution;
return execution.resultAsync().whenComplete((val, err) ->
- execution.stateAsync().whenComplete((state, errState) ->
+ t.stateAsync().whenComplete((state, errState) ->
execution.statesAsync().whenComplete((states,
errStates) ->
notificationSender.sendNotification(w -> {
- w.packObjectAsBinaryTuple(val);
+ Marshaller<Object, byte[]>
resultMarshaller = ((MarshallerProvider<Object>) t).resultMarshaller();
+ ClientComputeJobPacker.packJobResult(val,
resultMarshaller, w);
packTaskState(w, state);
packJobStates(w, states);
}, firstNotNull(err, errState, errStates)))
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 9396584a90..f030f63503 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -122,14 +122,14 @@ public class ClientCompute implements IgniteCompute {
return new ClientJobExecution<>(
ch,
doExecuteColocatedAsync(
- colocatedTarget.tableName(),
- colocatedTarget.key(),
- mapper,
- descriptor.units(),
- descriptor.jobClassName(),
- descriptor.options(),
- descriptor.argumentMarshaller(),
- arg
+ colocatedTarget.tableName(),
+ colocatedTarget.key(),
+ mapper,
+ descriptor.units(),
+ descriptor.jobClassName(),
+ descriptor.options(),
+ descriptor.argumentMarshaller(),
+ arg
),
descriptor.resultMarshaller());
} else {
@@ -228,7 +228,14 @@ public class ClientCompute implements IgniteCompute {
public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R>
taskDescriptor, @Nullable T arg) {
Objects.requireNonNull(taskDescriptor);
- return new ClientTaskExecution<>(ch,
doExecuteMapReduceAsync(taskDescriptor.units(), taskDescriptor.taskClassName(),
arg, null));
+ return new ClientTaskExecution<>(
+ ch,
+ doExecuteMapReduceAsync(
+ taskDescriptor.units(),
taskDescriptor.taskClassName(), arg,
+ taskDescriptor.splitJobArgumentMarshaller()
+ ),
+ taskDescriptor.reduceJobResultMarshaller()
+ );
}
@Override
@@ -236,14 +243,15 @@ public class ClientCompute implements IgniteCompute {
return sync(executeMapReduceAsync(taskDescriptor, arg));
}
- private <T> CompletableFuture<SubmitTaskResult> doExecuteMapReduceAsync(
+ private <T, R> CompletableFuture<SubmitTaskResult> doExecuteMapReduceAsync(
List<DeploymentUnit> units,
String taskClassName,
@Nullable T arg,
- @Nullable Marshaller<Object, byte[]> marshaller) {
+ @Nullable Marshaller<T, byte[]> argumentMarshaller
+ ) {
return ch.serviceAsync(
ClientOp.COMPUTE_EXECUTE_MAPREDUCE,
- w -> packTask(w.out(), units, taskClassName, arg, marshaller),
+ w -> packTask(w.out(), units, taskClassName, arg,
(Marshaller<Object, byte[]>) argumentMarshaller),
ClientCompute::unpackSubmitTaskResult,
null,
null,
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
index 84b1270557..65e8364296 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
@@ -37,6 +37,8 @@ import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker;
+import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
/**
@@ -59,7 +61,7 @@ class ClientTaskExecution<R> implements TaskExecution<R> {
// Local states cache
private final CompletableFuture<List<@Nullable JobState>> statesFutures =
new CompletableFuture<>();
- ClientTaskExecution(ReliableChannel ch,
CompletableFuture<SubmitTaskResult> reqFuture) {
+ ClientTaskExecution(ReliableChannel ch,
CompletableFuture<SubmitTaskResult> reqFuture, Marshaller<R, byte[]>
resultMarshaller) {
this.ch = ch;
jobIdFuture = reqFuture.thenApply(SubmitTaskResult::jobId);
@@ -70,7 +72,7 @@ class ClientTaskExecution<R> implements TaskExecution<R> {
.thenApply(payloadInputChannel -> {
// Notifications require explicit input close.
try (payloadInputChannel) {
- R result = (R)
payloadInputChannel.in().unpackObjectFromBinaryTuple();
+ R result = (R)
ClientComputeJobUnpacker.unpackJobResult(resultMarshaller,
payloadInputChannel.in());
stateFuture.complete(unpackTaskState(payloadInputChannel));
statesFutures.complete(unpackJobStates(payloadInputChannel));
return result;
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 2a323a24b8..a37b44c249 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -128,7 +128,7 @@ public class FakeCompute implements IgniteComputeInternal {
/** {@inheritDoc} */
@Override
- public <R> CompletableFuture<JobExecution<R>> submitColocatedInternal(
+ public <R> CompletableFuture<JobExecution<R>> submitColocatedInternal(
TableViewInternal table,
Tuple key,
List<DeploymentUnit> units,
@@ -261,32 +261,54 @@ public class FakeCompute implements IgniteComputeInternal
{
jobStates.put(subJobId2, toState.apply(subJobId2, status));
});
- return new TaskExecution<>() {
- @Override
- public CompletableFuture<R> resultAsync() {
- return result;
- }
-
- @Override
- public CompletableFuture<@Nullable TaskState> stateAsync() {
- return
completedFuture(TaskStateImpl.toBuilder(jobStates.get(jobId)).build());
- }
-
- @Override
- public CompletableFuture<List<@Nullable JobState>> statesAsync() {
- return completedFuture(List.of(jobStates.get(subJobId1),
jobStates.get(subJobId2)));
- }
-
- @Override
- public CompletableFuture<@Nullable Boolean> cancelAsync() {
- return trueCompletedFuture();
- }
-
- @Override
- public CompletableFuture<@Nullable Boolean>
changePriorityAsync(int newPriority) {
- return trueCompletedFuture();
- }
- };
+ return new FakeTaskExecution<>(result, jobId, subJobId1, subJobId2,
null);
+
+ }
+
+ class FakeTaskExecution<R> implements TaskExecution<R>,
MarshallerProvider<R> {
+ private final CompletableFuture<R> result;
+ private final UUID jobId;
+ private final UUID subJobId1;
+ private final UUID subJobId2;
+ private final Marshaller<R, byte[]> marshaller;
+
+ FakeTaskExecution(CompletableFuture<R> result, UUID jobId, UUID
subJobId1, UUID subJobId2, Marshaller<R, byte[]> marshaller) {
+ this.result = result;
+ this.jobId = jobId;
+ this.subJobId1 = subJobId1;
+ this.subJobId2 = subJobId2;
+ this.marshaller = marshaller;
+ }
+
+ @Override
+ public CompletableFuture<R> resultAsync() {
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<@Nullable TaskState> stateAsync() {
+ return
completedFuture(TaskStateImpl.toBuilder(jobStates.get(jobId)).build());
+ }
+
+ @Override
+ public CompletableFuture<List<@Nullable JobState>> statesAsync() {
+ return completedFuture(List.of(jobStates.get(subJobId1),
jobStates.get(subJobId2)));
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> cancelAsync() {
+ return trueCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
+ return trueCompletedFuture();
+ }
+
+ @Override
+ public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+ return marshaller;
+ }
}
@Override
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
index 9c2c3bbe81..90365ed2bf 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.task.TaskExecution;
+import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
/**
@@ -32,7 +33,7 @@ import org.jetbrains.annotations.Nullable;
*
* @param <R> Result type.
*/
-class TaskExecutionWrapper<R> implements TaskExecution<R> {
+class TaskExecutionWrapper<R> implements TaskExecution<R>,
MarshallerProvider<R> {
private final TaskExecution<R> delegate;
TaskExecutionWrapper(TaskExecution<R> delegate) {
@@ -63,4 +64,13 @@ class TaskExecutionWrapper<R> implements TaskExecution<R> {
public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
return
convertToPublicFuture(delegate.changePriorityAsync(newPriority));
}
+
+ @Override
+ public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+ if (delegate instanceof MarshallerProvider) {
+ return ((MarshallerProvider<R>) delegate).resultMarshaller();
+ }
+
+ return null;
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
index fc2c67ad10..fdf35eafb9 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
@@ -22,6 +22,8 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.task.TaskExecution;
+import org.apache.ignite.internal.compute.MarshallerProvider;
+import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
/**
@@ -29,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
*
* @param <R> Result type.
*/
-public class DelegatingTaskExecution<I, M, T, R> implements TaskExecution<R> {
+public class DelegatingTaskExecution<I, M, T, R> implements TaskExecution<R>,
MarshallerProvider<R> {
private final CompletableFuture<TaskExecutionInternal<I, M, T, R>>
delegate;
public DelegatingTaskExecution(CompletableFuture<TaskExecutionInternal<I,
M, T, R>> delegate) {
@@ -60,4 +62,11 @@ public class DelegatingTaskExecution<I, M, T, R> implements
TaskExecution<R> {
public CompletableFuture<@Nullable Boolean> changePriorityAsync(int
newPriority) {
return delegate.thenCompose(execution ->
execution.changePriorityAsync(newPriority));
}
+
+ @Override
+ public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+ assert delegate.isDone() : "Task execution is supposed to be done
before calling `resultMarshaller()`";
+
+ return delegate.join().resultMarshaller();
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index 5ce5ae124e..2a359b3ac8 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -30,6 +30,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+import static org.apache.ignite.marshalling.Marshaller.tryUnmarshalOrCast;
import java.time.Instant;
import java.util.Arrays;
@@ -50,11 +51,13 @@ import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.compute.MarshallerProvider;
import org.apache.ignite.internal.compute.TaskStateImpl;
import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
import org.apache.ignite.internal.compute.queue.QueueExecution;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
/**
@@ -66,7 +69,7 @@ import org.jetbrains.annotations.Nullable;
* @param <R> Task result type.
*/
@SuppressWarnings("unchecked")
-public class TaskExecutionInternal<I, M, T, R> implements TaskExecution<R> {
+public class TaskExecutionInternal<I, M, T, R> implements TaskExecution<R>,
MarshallerProvider<R> {
private static final IgniteLogger LOG =
Loggers.forClass(TaskExecutionInternal.class);
private final QueueExecution<SplitResult<I, M, T, R>> splitExecution;
@@ -81,6 +84,8 @@ public class TaskExecutionInternal<I, M, T, R> implements
TaskExecution<R> {
private final AtomicBoolean isCancelled;
+ private volatile @Nullable Marshaller<R, byte[]> reduceResultMarshallerRef;
+
/**
* Construct an execution object and starts executing.
*
@@ -105,7 +110,9 @@ public class TaskExecutionInternal<I, M, T, R> implements
TaskExecution<R> {
() -> {
MapReduceTask<I, M, T, R> task =
instantiateTask(taskClass);
- return task.splitAsync(context, arg)
+ reduceResultMarshallerRef =
task.reduceJobResultMarshaller();
+
+ return task.splitAsync(context,
tryUnmarshalOrCast(task.splitJobInputMarshaller(), arg))
.thenApply(jobs -> new SplitResult<>(task, jobs));
},
@@ -298,6 +305,11 @@ public class TaskExecutionInternal<I, M, T, R> implements
TaskExecution<R> {
.collect(toList());
}
+ @Override
+ public @Nullable Marshaller<R, byte[]> resultMarshaller() {
+ return reduceResultMarshallerRef;
+ }
+
private static class SplitResult<I, M, T, R> {
private final MapReduceTask<I, M, T, R> task;
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index ed988ff82e..9b9f9545d5 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -358,7 +358,8 @@ namespace Apache.Ignite.Internal.Compute
static (T, TaskState) Read(MsgPackReader reader)
{
- var res = (T)reader.ReadObjectFromBinaryTuple()!;
+ // TODO IGNITE-23074 .NET: Thin 3.0: Support marshallers in
MapReduce
+ var res = ComputePacker.UnpackResult<T>(ref reader, null);
var state = ReadTaskState(reader);
return (res, state);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/Jobs.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/Jobs.java
index ac205d37a6..14cef44c83 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/Jobs.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/Jobs.java
@@ -36,6 +36,7 @@ import org.apache.ignite.compute.task.TaskExecutionContext;
import org.apache.ignite.marshalling.ByteArrayMarshaller;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
import org.jetbrains.annotations.Nullable;
/** Jobs and marhallers definitions that are used in tests. */
@@ -276,7 +277,7 @@ public class Jobs {
}
/** MapReduce task that splits input list into two parts and sends them to
different nodes. */
- public static class MapReduce implements MapReduceTask<List<String>,
String, String, String> {
+ public static class MapReduce implements MapReduceTask<List<String>,
String, String, List<String>> {
@Override
public CompletableFuture<List<MapReduceJob<String, String>>>
splitAsync(
TaskExecutionContext taskContext,
@@ -293,17 +294,72 @@ public class Jobs {
MapReduceJob.<String, String>builder()
.jobDescriptor(mapJobDescriptor)
.node(nodes.get(0))
+ .args(input.get(0))
.build(),
MapReduceJob.<String, String>builder()
.jobDescriptor(mapJobDescriptor)
.node(nodes.get(1))
+ .args(input.get(1))
.build()
));
}
@Override
- public CompletableFuture<String> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, String> results) {
- return null;
+ public CompletableFuture<List<String>>
reduceAsync(TaskExecutionContext taskContext, Map<UUID, String> results) {
+ return completedFuture(new ArrayList<>(results.values()));
+ }
+
+ @Override
+ public @Nullable Marshaller<List<String>, byte[]>
splitJobInputMarshaller() {
+ return ByteArrayMarshaller.create();
+ }
+
+ @Override
+ public @Nullable Marshaller<List<String>, byte[]>
reduceJobResultMarshaller() {
+ return ByteArrayMarshaller.create();
+ }
+ }
+
+ /** MapReduce that adds a column to the tuple on each step. */
+ public static class MapReduceTuples implements MapReduceTask<Tuple, Tuple,
Tuple, Tuple> {
+ @Override
+ public CompletableFuture<List<MapReduceJob<Tuple, Tuple>>>
splitAsync(TaskExecutionContext taskContext, @Nullable Tuple input) {
+ List<ClusterNode> nodes = new
ArrayList<>(taskContext.ignite().clusterNodes());
+ Tuple jobsInput = Tuple.copy(input);
+ jobsInput.set("split", "call");
+
+ var mapJobDescriptor =
JobDescriptor.builder(EchoTupleJob.class).build();
+
+ return completedFuture(List.of(
+ MapReduceJob.<Tuple, Tuple>builder()
+ .jobDescriptor(mapJobDescriptor)
+ .node(nodes.get(0))
+ .args(jobsInput)
+ .build(),
+ MapReduceJob.<Tuple, Tuple>builder()
+ .jobDescriptor(mapJobDescriptor)
+ .node(nodes.get(1))
+ .args(jobsInput)
+ .build()
+ ));
+ }
+
+ @Override
+ public CompletableFuture<Tuple> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, Tuple> results) {
+ Tuple reduceResult = Tuple.copy((Tuple)
results.values().toArray()[0]);
+ reduceResult.set("reduce", "call");
+
+ return completedFuture(reduceResult);
+ }
+ }
+
+ static class EchoTupleJob implements ComputeJob<Tuple, Tuple> {
+ @Override
+ public @Nullable CompletableFuture<Tuple>
executeAsync(JobExecutionContext context, @Nullable Tuple arg) {
+ var tuple = Tuple.copy(arg);
+ tuple.set("echo", "echo");
+
+ return completedFuture(tuple);
}
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
index 17a13d56d7..512cd659e8 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeMarshallingTest.java
@@ -18,6 +18,10 @@
package org.apache.ignite.internal.runner.app.client;
import static org.apache.ignite.catalog.definitions.ColumnDefinition.column;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -39,14 +43,15 @@ import
org.apache.ignite.internal.runner.app.Jobs.ArgumentAndResultMarshallingJo
import org.apache.ignite.internal.runner.app.Jobs.ArgumentStringMarshaller;
import org.apache.ignite.internal.runner.app.Jobs.JsonMarshaller;
import org.apache.ignite.internal.runner.app.Jobs.MapReduce;
+import org.apache.ignite.internal.runner.app.Jobs.MapReduceTuples;
import org.apache.ignite.internal.runner.app.Jobs.PojoArg;
import org.apache.ignite.internal.runner.app.Jobs.PojoJob;
import org.apache.ignite.internal.runner.app.Jobs.PojoResult;
import org.apache.ignite.internal.runner.app.Jobs.ResultMarshallingJob;
import org.apache.ignite.internal.runner.app.Jobs.ResultStringUnMarshaller;
+import org.apache.ignite.marshalling.ByteArrayMarshaller;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -196,7 +201,6 @@ public class ItThinClientComputeMarshallingTest extends
ItAbstractThinClientTest
}
-
@Test
void submitBroadcast() {
// When.
@@ -264,20 +268,45 @@ public class ItThinClientComputeMarshallingTest extends
ItAbstractThinClientTest
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-22787")
void mapReduce() {
- // When run job with custom marshaller for string argument.
- String result = client().compute().executeMapReduce(
- TaskDescriptor.builder(MapReduce.class).build(),
- List.of("Input_0", "Input_1"));
+ // When.
+ List<String> result = client().compute().executeMapReduce(
+ TaskDescriptor.builder(MapReduce.class)
+
.splitJobArgumentMarshaller(ByteArrayMarshaller.create())
+
.reduceJobArgumentMarshaller(ByteArrayMarshaller.create())
+ .build(),
+ // input_O goes to 0 node and input_1 goes to 1 node
+ List.of("Input_0", "Input_1")
+ );
- // Then both client and server marshaller were called.
- assertEquals("Input"
- + ":marshalledOnClient"
- + ":unmarshalledOnServer"
- + ":processedOnServer",
- result
+ // Then.
+ assertThat(
+ result,
+
hasItem(containsString("Input_0:marshalledOnClient:unmarshalledOnServer:processedOnServer"))
+ );
+ // And
+ assertThat(
+ result,
+
hasItem(containsString("Input_1:marshalledOnClient:unmarshalledOnServer:processedOnServer"))
+ );
+ }
+
+ @Test
+ void mapReduceTuples() {
+ // When.
+ Tuple result = client().compute().executeMapReduce(
+ TaskDescriptor.builder(MapReduceTuples.class).build(),
+ Tuple.create().set("from", "client")
);
+
+ // Then.
+ Tuple expectedTuple = Tuple.create();
+ expectedTuple.set("from", "client");
+ expectedTuple.set("split", "call");
+ expectedTuple.set("reduce", "call");
+ expectedTuple.set("echo", "echo");
+
+ assertThat(result, equalTo(expectedTuple));
}
private ClusterNode node(int idx) {