This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e2594a72ee7 IGNITE-26331 Introduce compute execution context (#6508)
e2594a72ee7 is described below

commit e2594a72ee70d4e40d91d25c7028b7e31fec4ea9
Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com>
AuthorDate: Fri Aug 29 12:43:05 2025 +0300

    IGNITE-26331 Introduce compute execution context (#6508)
---
 .../ClientComputeExecuteColocatedRequest.java      |  17 +--
 .../compute/ClientComputeExecuteRequest.java       |   3 +-
 .../apache/ignite/client/fakes/FakeCompute.java    |  31 +++---
 .../ignite/internal/compute/ComputeComponent.java  |  36 ++-----
 .../internal/compute/ComputeComponentImpl.java     |  56 +++-------
 .../internal/compute/ComputeJobFailover.java       |  38 ++-----
 .../ignite/internal/compute/ExecutionContext.java  | 118 +++++++++++++++++++++
 .../ignite/internal/compute/IgniteComputeImpl.java | 108 ++++++-------------
 .../internal/compute/IgniteComputeInternal.java    |  24 +----
 .../apache/ignite/internal/compute/JobStarter.java |  18 +---
 .../internal/compute/RemoteExecutionContext.java   |  72 -------------
 .../compute/messaging/ComputeMessaging.java        |  29 ++---
 .../internal/compute/ComputeComponentImplTest.java |  23 ++--
 .../internal/compute/IgniteComputeImplTest.java    |  34 +++---
 14 files changed, 260 insertions(+), 347 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
index a4f6926df47..f2bb27b1e57 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
@@ -21,6 +21,7 @@ import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeExe
 import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.sendResultAndState;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
+import static 
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJob;
 import static 
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackTaskId;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_TASK_ID;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
@@ -31,10 +32,10 @@ import org.apache.ignite.client.handler.ClientContext;
 import org.apache.ignite.client.handler.NotificationSender;
 import org.apache.ignite.client.handler.ResponseWriter;
 import org.apache.ignite.compute.JobExecution;
-import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker;
 import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.Job;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.compute.ComputeJobDataHolder;
+import org.apache.ignite.internal.compute.ExecutionContext;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
@@ -71,7 +72,7 @@ public class ClientComputeExecuteColocatedRequest {
         BitSet noValueSet = in.unpackBitSet();
         byte[] tupleBytes = in.readBinary();
 
-        Job job = ClientComputeJobUnpacker.unpackJob(in, 
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
+        Job job = unpackJob(in, 
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
         unpackTaskId(in, clientContext.hasFeature(COMPUTE_TASK_ID)); // 
Placeholder for a possible future usage
 
         return readTableAsync(tableId, tables).thenCompose(table -> 
readTuple(schemaId, noValueSet, tupleBytes, table, true)
@@ -84,11 +85,13 @@ public class ClientComputeExecuteColocatedRequest {
                     CompletableFuture<JobExecution<ComputeJobDataHolder>> 
jobExecutionFut = compute.submitColocatedInternal(
                             table,
                             keyTuple,
-                            job.deploymentUnits(),
-                            job.jobClassName(),
-                            job.options(),
-                            metadataBuilder,
-                            job.arg(),
+                            new ExecutionContext(
+                                    job.options(),
+                                    job.deploymentUnits(),
+                                    job.jobClassName(),
+                                    metadataBuilder,
+                                    job.arg()
+                            ),
                             null
                     );
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 8c5eb87bb76..02f496299b3 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -40,6 +40,7 @@ import 
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.Job;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.compute.ComputeJobDataHolder;
+import org.apache.ignite.internal.compute.ExecutionContext;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import org.apache.ignite.internal.compute.MarshallerProvider;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
@@ -86,7 +87,7 @@ public class ClientComputeExecuteRequest {
                 .clientAddress(clientContext.remoteAddress().toString());
 
         CompletableFuture<JobExecution<ComputeJobDataHolder>> executionFut = 
compute.executeAsyncWithFailover(
-                candidates, job.deploymentUnits(), job.jobClassName(), 
job.options(), metadataBuilder, job.arg(), null
+                candidates, new ExecutionContext(job.options(), 
job.deploymentUnits(), job.jobClassName(), metadataBuilder, job.arg()), null
         );
         sendResultAndState(executionFut, notificationSender);
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 934085e4736..8368b46b091 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -57,6 +57,7 @@ import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.ComputeUtils;
+import org.apache.ignite.internal.compute.ExecutionContext;
 import org.apache.ignite.internal.compute.HybridTimestampProvider;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import org.apache.ignite.internal.compute.JobExecutionContextImpl;
@@ -107,14 +108,12 @@ public class FakeCompute implements IgniteComputeInternal 
{
     @Override
     public CompletableFuture<JobExecution<ComputeJobDataHolder>> 
executeAsyncWithFailover(
             Set<ClusterNode> nodes,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            JobExecutionOptions options,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
-            @Nullable CancellationToken cancellationToken) {
+            ExecutionContext executionContext,
+            @Nullable CancellationToken cancellationToken
+    ) {
+        String jobClassName = executionContext.jobClassName();
         if (Objects.equals(jobClassName, GET_UNITS)) {
-            String unitString = 
units.stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
+            String unitString = 
executionContext.units().stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
             return completedExecution(unitString);
         }
 
@@ -135,7 +134,7 @@ public class FakeCompute implements IgniteComputeInternal {
             ComputeJob<Object, Object> job = 
ComputeUtils.instantiateJob(jobClass);
             CompletableFuture<Object> jobFut = job.executeAsync(
                     new JobExecutionContextImpl(ignite, new AtomicBoolean(), 
jobClassLoader, null),
-                    SharedComputeUtils.unmarshalArgOrResult(arg, null, null));
+                    
SharedComputeUtils.unmarshalArgOrResult(executionContext.arg(), null, null));
 
             return jobExecution(jobFut != null ? jobFut : 
nullCompletedFuture());
         }
@@ -154,11 +153,7 @@ public class FakeCompute implements IgniteComputeInternal {
     public CompletableFuture<JobExecution<ComputeJobDataHolder>> 
submitColocatedInternal(
             TableViewInternal table,
             Tuple key,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            JobExecutionOptions options,
-            ComputeEventMetadataBuilder metadataBuilder,
-            ComputeJobDataHolder args,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     ) {
         return jobExecution(future != null
@@ -202,11 +197,11 @@ public class FakeCompute implements IgniteComputeInternal 
{
 
             return executeAsyncWithFailover(
                     nodes,
-                    descriptor.units(),
-                    descriptor.jobClassName(),
-                    descriptor.options(),
-                    ComputeEventMetadata.builder(),
-                    SharedComputeUtils.marshalArgOrResult(arg, null, 
observableTimestamp.longValue()),
+                    new ExecutionContext(
+                            descriptor,
+                            ComputeEventMetadata.builder(),
+                            SharedComputeUtils.marshalArgOrResult(arg, null, 
observableTimestamp.longValue())
+                    ),
                     cancellationToken
             ).thenApply(internalExecution -> unmarshalingExecution(descriptor, 
internalExecution));
         } else if (target instanceof ColocatedJobTarget) {
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
index 5f8091c5bb3..a8f4da83a62 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -38,20 +38,12 @@ public interface ComputeComponent extends IgniteComponent {
     /**
      * Executes a job of the given class on the current node.
      *
-     * @param options Job execution options.
-     * @param units Deployment units which will be loaded for execution.
-     * @param jobClassName Name of the job class.
-     * @param metadataBuilder Event metadata builder.
-     * @param arg Job argument.
+     * @param executionContext Execution context.
      * @param cancellationToken Cancellation token or {@code null}.
      * @return Future of the job execution object which will be completed when 
the job is submitted.
      */
     CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> 
executeLocally(
-            ExecutionOptions options,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     );
 
@@ -59,21 +51,13 @@ public interface ComputeComponent extends IgniteComponent {
      * Executes a job of the given class on a remote node.
      *
      * @param remoteNode Remote node name.
-     * @param options Job execution options.
-     * @param units Deployment units which will be loaded for execution.
-     * @param jobClassName Name of the job class.
-     * @param metadataBuilder Event metadata builder.
-     * @param arg Job argument.
+     * @param executionContext Execution context.
      * @param cancellationToken Cancellation token or {@code null}.
      * @return Future of the job execution object which will be completed when 
the job is submitted.
      */
     CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> 
executeRemotely(
             ClusterNode remoteNode,
-            ExecutionOptions options,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     );
 
@@ -83,22 +67,14 @@ public interface ComputeComponent extends IgniteComponent {
      *
      * @param remoteNode Remote node name.
      * @param nextWorkerSelector The selector that returns the next worker to 
execute job on.
-     * @param options Job execution options.
-     * @param units Deployment units which will be loaded for execution.
-     * @param jobClassName Name of the job class.
-     * @param metadataBuilder Event metadata builder.
-     * @param arg Job argument.
+     * @param executionContext Execution context.
      * @param cancellationToken Cancellation token or {@code null}.
      * @return Future of the job execution object which will be completed when 
the job is submitted.
      */
     CompletableFuture<JobExecution<ComputeJobDataHolder>> 
executeRemotelyWithFailover(
             ClusterNode remoteNode,
             NextWorkerSelector nextWorkerSelector,
-            ExecutionOptions options,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     );
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 635beab11e8..47feca02b11 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -126,11 +126,7 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
 
     @Override
     public CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> 
executeLocally(
-            ExecutionOptions options,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     ) {
         if (!busyLock.enterBusy()) {
@@ -138,14 +134,12 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
         }
 
         try {
-            CompletableFuture<JobContext> classLoaderFut = 
jobContextManager.acquireClassLoader(units);
+            CompletableFuture<JobContext> classLoaderFut = 
jobContextManager.acquireClassLoader(executionContext.units());
 
             CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> 
future =
-                    mapClassLoaderExceptions(classLoaderFut, jobClassName)
+                    mapClassLoaderExceptions(classLoaderFut, 
executionContext.jobClassName())
                             .thenApply(context -> {
-                                JobExecutionInternal<ComputeJobDataHolder> 
execution = execJob(
-                                        context, options, jobClassName, 
metadataBuilder, arg
-                                );
+                                JobExecutionInternal<ComputeJobDataHolder> 
execution = execJob(context, executionContext);
                                 execution.resultAsync().whenComplete((result, 
e) -> context.close());
                                 
inFlightFutures.registerFuture(execution.resultAsync());
 
@@ -222,11 +216,7 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
     @Override
     public CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> 
executeRemotely(
             ClusterNode remoteNode,
-            ExecutionOptions options,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     ) {
         if (!busyLock.enterBusy()) {
@@ -234,14 +224,7 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
         }
 
         try {
-            CompletableFuture<UUID> jobIdFuture = 
messaging.remoteExecuteRequestAsync(
-                    remoteNode,
-                    options,
-                    units,
-                    jobClassName,
-                    metadataBuilder,
-                    arg
-            );
+            CompletableFuture<UUID> jobIdFuture = 
messaging.remoteExecuteRequestAsync(remoteNode, executionContext);
 
             inFlightFutures.registerFuture(jobIdFuture);
 
@@ -267,16 +250,12 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
     public CompletableFuture<JobExecution<ComputeJobDataHolder>> 
executeRemotelyWithFailover(
             ClusterNode remoteNode,
             NextWorkerSelector nextWorkerSelector,
-            ExecutionOptions options,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     ) {
         return ComputeJobFailover.failSafeExecute(
                         this, logicalTopologyService, topologyService, 
failoverExecutor, eventLog,
-                        remoteNode, nextWorkerSelector, options, units, 
jobClassName, metadataBuilder, arg
+                        remoteNode, nextWorkerSelector, executionContext
                 )
                 .thenApply(execution -> {
                     // Do not add cancel action to the underlying jobs, let 
the FailSafeJobExecution handle it.
@@ -329,8 +308,7 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         executor.start();
-        messaging.start((options, units, jobClassName, metadataBuilder, arg) ->
-                executeLocally(options, units, jobClassName, metadataBuilder, 
arg, null));
+        messaging.start(executionContext -> executeLocally(executionContext, 
null));
         executionManager.start();
         computeViewProvider.init(executionManager);
 
@@ -356,15 +334,15 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
         return nullCompletedFuture();
     }
 
-    private JobExecutionInternal<ComputeJobDataHolder> execJob(
-            JobContext context,
-            ExecutionOptions options,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg
-    ) {
+    private JobExecutionInternal<ComputeJobDataHolder> execJob(JobContext 
context, ExecutionContext executionContext) {
         try {
-            return executor.executeJob(options, jobClassName, 
context.classLoader(), metadataBuilder, arg);
+            return executor.executeJob(
+                    executionContext.options(),
+                    executionContext.jobClassName(),
+                    context.classLoader(),
+                    executionContext.metadataBuilder(),
+                    executionContext.arg()
+            );
         } catch (Throwable e) {
             context.close();
             throw e;
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
index 2646411ea8c..293667baa50 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
@@ -17,18 +17,15 @@
 
 package org.apache.ignite.internal.compute;
 
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
-import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
 import org.apache.ignite.internal.compute.events.ComputeEventsFactory;
 import org.apache.ignite.internal.eventlog.api.EventLog;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -37,7 +34,6 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.lang.ErrorGroups.Compute;
 import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * This is a helper class for {@link ComputeComponent} to handle job failures. 
You can think about this class as a "retryable compute job
@@ -87,7 +83,7 @@ class ComputeJobFailover {
     /**
      * Context of the called job. Captures deployment units, jobClassName and 
arguments.
      */
-    private final RemoteExecutionContext jobContext;
+    private final ExecutionContext jobContext;
 
     /**
      * Job id of the execution.
@@ -120,11 +116,7 @@ class ComputeJobFailover {
             EventLog eventLog,
             ClusterNode workerNode,
             NextWorkerSelector nextWorkerSelector,
-            ExecutionOptions executionOptions,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg
+            ExecutionContext executionContext
     ) {
         this.computeComponent = computeComponent;
         this.logicalTopologyService = logicalTopologyService;
@@ -135,9 +127,9 @@ class ComputeJobFailover {
         this.nextWorkerSelector = nextWorkerSelector;
 
         // Assign failover job id so that it is consistent for any remote job.
-        metadataBuilder.jobId(jobId);
+        executionContext.metadataBuilder().jobId(jobId);
 
-        this.jobContext = new RemoteExecutionContext(executionOptions, units, 
jobClassName, metadataBuilder, arg);
+        this.jobContext = executionContext;
     }
 
     static CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> 
failSafeExecute(
@@ -148,11 +140,7 @@ class ComputeJobFailover {
             EventLog eventLog,
             ClusterNode workerNode,
             NextWorkerSelector nextWorkerSelector,
-            ExecutionOptions executionOptions,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg
+            ExecutionContext executionContext
     ) {
         return new ComputeJobFailover(
                 computeComponent,
@@ -162,11 +150,7 @@ class ComputeJobFailover {
                 eventLog,
                 workerNode,
                 nextWorkerSelector,
-                executionOptions,
-                units,
-                jobClassName,
-                metadataBuilder,
-                arg
+                executionContext
         ).execute();
     }
 
@@ -191,15 +175,9 @@ class ComputeJobFailover {
 
     private CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> 
launchJobOn(ClusterNode runningWorkerNode) {
         if 
(runningWorkerNode.name().equals(topologyService.localMember().name())) {
-            return computeComponent.executeLocally(
-                    jobContext.executionOptions(), jobContext.units(), 
jobContext.jobClassName(),
-                    jobContext.metadataBuilder(), jobContext.arg(), null
-            );
+            return computeComponent.executeLocally(jobContext, null);
         } else {
-            return computeComponent.executeRemotely(
-                    runningWorkerNode, jobContext.executionOptions(), 
jobContext.units(), jobContext.jobClassName(),
-                    jobContext.metadataBuilder(), jobContext.arg(), null
-            );
+            return computeComponent.executeRemotely(runningWorkerNode, 
jobContext, null);
         }
     }
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
new file mode 100644
index 00000000000..5a97a13d26d
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.List;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Captures the context of a job execution.
+ */
+public class ExecutionContext {
+    private final ExecutionOptions options;
+
+    private final List<DeploymentUnit> units;
+
+    private final String jobClassName;
+
+    private final ComputeEventMetadataBuilder metadataBuilder;
+
+    private final ComputeJobDataHolder arg;
+
+    /**
+     * Creates new execution context.
+     *
+     * @param options Job execution options.
+     * @param units Deployment units which will be loaded for execution.
+     * @param jobClassName Name of the job class.
+     * @param metadataBuilder Event metadata builder.
+     * @param arg Job argument.
+     */
+    public ExecutionContext(
+            ExecutionOptions options,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            ComputeEventMetadataBuilder metadataBuilder,
+            @Nullable ComputeJobDataHolder arg
+    ) {
+        this.options = options;
+        this.units = units;
+        this.jobClassName = jobClassName;
+        this.metadataBuilder = metadataBuilder;
+        this.arg = arg;
+    }
+
+    /**
+     * Creates new execution context.
+     *
+     * @param jobExecutionOptions Job execution options.
+     * @param units Deployment units which will be loaded for execution.
+     * @param jobClassName Name of the job class.
+     * @param metadataBuilder Event metadata builder.
+     * @param arg Job argument.
+     */
+    public ExecutionContext(
+            JobExecutionOptions jobExecutionOptions,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            ComputeEventMetadataBuilder metadataBuilder,
+            @Nullable ComputeJobDataHolder arg
+    ) {
+        this(ExecutionOptions.from(jobExecutionOptions), units, jobClassName, 
metadataBuilder, arg);
+    }
+
+    /**
+     * Creates new execution context. Takes execution options, deployment 
units and job class name from a job descriptor.
+     *
+     * @param descriptor Job descriptor.
+     * @param metadataBuilder Event metadata builder.
+     * @param arg Job argument.
+     */
+    public <T, R> ExecutionContext(
+            JobDescriptor<T, R> descriptor,
+            ComputeEventMetadataBuilder metadataBuilder,
+            @Nullable ComputeJobDataHolder arg
+    ) {
+        this(descriptor.options(), descriptor.units(), 
descriptor.jobClassName(), metadataBuilder, arg);
+    }
+
+    public ExecutionOptions options() {
+        return options;
+    }
+
+    public List<DeploymentUnit> units() {
+        return units;
+    }
+
+    public String jobClassName() {
+        return jobClassName;
+    }
+
+    public ComputeEventMetadataBuilder metadataBuilder() {
+        return metadataBuilder;
+    }
+
+    @Nullable
+    public ComputeJobDataHolder arg() {
+        return arg;
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index cdb08ae8949..a54f82f6de0 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -163,15 +163,13 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
         Objects.requireNonNull(descriptor);
 
         ComputeJobDataHolder argHolder = 
SharedComputeUtils.marshalArgOrResult(arg, descriptor.argumentMarshaller());
+        ExecutionContext executionContext = new ExecutionContext(descriptor, 
metadataBuilder, argHolder);
 
         if (target instanceof AnyNodeJobTarget) {
             Set<ClusterNode> nodes = ((AnyNodeJobTarget) target).nodes();
 
             return unmarshalResult(
-                    executeAsyncWithFailover(
-                            nodes, descriptor.units(), 
descriptor.jobClassName(), descriptor.options(), metadataBuilder,
-                            argHolder, cancellationToken
-                    ),
+                    executeAsyncWithFailover(nodes, executionContext, 
cancellationToken),
                     descriptor,
                     observableTimestampTracker
             );
@@ -183,7 +181,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
             QualifiedName tableName = colocatedTarget.tableName();
             Object key = colocatedTarget.key();
 
-            metadataBuilder.tableName(tableName.toCanonicalForm());
+            
executionContext.metadataBuilder().tableName(tableName.toCanonicalForm());
 
             CompletableFuture<JobExecution<ComputeJobDataHolder>> jobFut;
             if (mapper != null) {
@@ -200,11 +198,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                                                 key,
                                                 mapper
                                         ),
-                                        descriptor.units(),
-                                        descriptor.jobClassName(),
-                                        descriptor.options(),
-                                        metadataBuilder,
-                                        argHolder,
+                                        executionContext,
                                         cancellationToken
                                 )));
 
@@ -213,11 +207,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                         .thenCompose(table -> submitColocatedInternal(
                                 table,
                                 (Tuple) key,
-                                descriptor.units(),
-                                descriptor.jobClassName(),
-                                descriptor.options(),
-                                metadataBuilder,
-                                argHolder,
+                                executionContext,
                                 cancellationToken
                         ));
             }
@@ -360,11 +350,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                 executeOnOneNodeWithFailover(
                         node,
                         nextWorkerSelector,
-                        descriptor.units(),
-                        descriptor.jobClassName(),
-                        options,
-                        metadataBuilder,
-                        argHolder,
+                        new ExecutionContext(options, descriptor.units(), 
descriptor.jobClassName(), metadataBuilder, argHolder),
                         cancellationToken
                 ),
                 descriptor,
@@ -408,11 +394,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
     @Override
     public CompletableFuture<JobExecution<ComputeJobDataHolder>> 
executeAsyncWithFailover(
             Set<ClusterNode> nodes,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            JobExecutionOptions options,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     ) {
         Set<ClusterNode> candidates = new HashSet<>();
@@ -431,16 +413,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
 
         NextWorkerSelector selector = new DeqNextWorkerSelector(new 
ConcurrentLinkedDeque<>(candidates));
 
-        return executeOnOneNodeWithFailover(
-                targetNode,
-                selector,
-                units,
-                jobClassName,
-                options,
-                metadataBuilder,
-                arg,
-                cancellationToken
-        );
+        return executeOnOneNodeWithFailover(targetNode, selector, 
executionContext, cancellationToken);
     }
 
     private static ClusterNode randomNode(Set<ClusterNode> nodes) {
@@ -457,41 +430,26 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
     private CompletableFuture<JobExecution<ComputeJobDataHolder>> 
executeOnOneNodeWithFailover(
             ClusterNode targetNode,
             NextWorkerSelector nextWorkerSelector,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            JobExecutionOptions jobExecutionOptions,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     ) {
-        ExecutionOptions options = ExecutionOptions.from(jobExecutionOptions);
-
-        return executeOnOneNodeWithFailover(
-                targetNode, nextWorkerSelector, units, jobClassName, options, 
metadataBuilder,
-                arg, cancellationToken
-        );
+        return convertToComputeFuture(
+                executeOnOneNodeWithFailoverInternal(targetNode, 
nextWorkerSelector, executionContext, cancellationToken)
+        ).thenApply(JobExecutionWrapper::new);
     }
 
-    private CompletableFuture<JobExecution<ComputeJobDataHolder>> 
executeOnOneNodeWithFailover(
+    private CompletableFuture<? extends JobExecution<ComputeJobDataHolder>> 
executeOnOneNodeWithFailoverInternal(
             ClusterNode targetNode,
             NextWorkerSelector nextWorkerSelector,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ExecutionOptions options,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     ) {
-        metadataBuilder.initiatorNode(nodeName);
+        executionContext.metadataBuilder().initiatorNode(nodeName);
 
         if (isLocal(targetNode)) {
-            return convertToComputeFuture(computeComponent.executeLocally(
-                    options, units, jobClassName, metadataBuilder, arg, 
cancellationToken
-            )).thenApply(JobExecutionWrapper::new);
+            return computeComponent.executeLocally(executionContext, 
cancellationToken);
         } else {
-            return 
convertToComputeFuture(computeComponent.executeRemotelyWithFailover(
-                    targetNode, nextWorkerSelector, options, units, 
jobClassName, metadataBuilder, arg, cancellationToken
-            )).thenApply(JobExecutionWrapper::new);
+            return computeComponent.executeRemotelyWithFailover(targetNode, 
nextWorkerSelector, executionContext, cancellationToken);
         }
     }
 
@@ -520,18 +478,15 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
     public CompletableFuture<JobExecution<ComputeJobDataHolder>> 
submitColocatedInternal(
             TableViewInternal table,
             Tuple key,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            JobExecutionOptions options,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     ) {
         return primaryReplicaForPartitionByTupleKey(table, key)
                 .thenCompose(primaryNode -> executeOnOneNodeWithFailover(
                         primaryNode,
                         new NextColocatedWorkerSelector<>(placementDriver, 
topologyService, clock, nodeProperties, table, key),
-                        units, jobClassName, options, metadataBuilder, arg, 
cancellationToken
+                        executionContext,
+                        cancellationToken
                 ));
     }
 
@@ -560,7 +515,8 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                         new PartitionNextWorkerSelector(
                                 placementDriver, topologyService, clock, 
nodeProperties,
                                 table.zoneId(), table.tableId(), partition),
-                        units, jobClassName, options, metadataBuilder, arg, 
cancellationToken
+                        new ExecutionContext(options, units, jobClassName, 
metadataBuilder, arg),
+                        cancellationToken
                 ));
     }
 
@@ -709,23 +665,25 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
             byte[] payload,
             ClusterNode node,
             List<DeploymentUnit> deploymentUnits,
-            ReceiverExecutionOptions options) {
-        JobExecutionOptions jobOptions = JobExecutionOptions.builder()
+            ReceiverExecutionOptions options
+    ) {
+        ExecutionOptions jobOptions = ExecutionOptions.builder()
                 .priority(options.priority())
                 .maxRetries(options.maxRetries())
                 .executorType(options.executorType())
                 .build();
 
-        // Use Compute to execute receiver on the target node with failover, 
class loading, scheduling.
-        return executeAsyncWithFailover(
-                Set.of(node),
+        ExecutionContext executionContext = new ExecutionContext(
+                jobOptions,
                 deploymentUnits,
                 getReceiverJobClassName(options.executorType()),
-                jobOptions,
                 ComputeEventMetadata.builder(Type.DATA_RECEIVER),
-                SharedComputeUtils.marshalArgOrResult(payload, null),
-                null
-        ).thenCompose(JobExecution::resultAsync)
+                SharedComputeUtils.marshalArgOrResult(payload, null)
+        );
+
+        // Use Compute to execute receiver on the target node with failover, 
class loading, scheduling.
+        return executeAsyncWithFailover(Set.of(node), executionContext, null)
+                .thenCompose(JobExecution::resultAsync)
                 .handle((res, err) -> {
                     if (err != null) {
                         if (err.getCause() instanceof ComputeException) {
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
index 5702442d123..868654c19dc 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
@@ -47,21 +47,13 @@ public interface IgniteComputeInternal extends 
IgniteCompute {
      * candidate nodes.
      *
      * @param nodes Candidate nodes; In case target node left the cluster, the 
job will be restarted on one of them.
-     * @param units Deployment units. Can be empty.
-     * @param jobClassName Name of the job class to execute.
-     * @param options Job execution options.
-     * @param metadataBuilder Event metadata builder.
-     * @param arg Argument of the job.
+     * @param executionContext Execution context.
      * @param cancellationToken Cancellation token or {@code null}.
      * @return CompletableFuture Job result.
      */
     CompletableFuture<JobExecution<ComputeJobDataHolder>> 
executeAsyncWithFailover(
             Set<ClusterNode> nodes,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            JobExecutionOptions options,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     );
 
@@ -71,22 +63,14 @@ public interface IgniteComputeInternal extends 
IgniteCompute {
      *
      * @param table Table whose key is used to determine the node to execute 
the job on.
      * @param key Key that identifies the node to execute the job on.
-     * @param units Deployment units. Can be empty.
-     * @param jobClassName Name of the job class to execute.
-     * @param options job execution options (priority, max retries).
-     * @param metadataBuilder Event metadata builder.
-     * @param arg Argument of the job.
+     * @param executionContext Execution context.
      * @param cancellationToken Cancellation token or {@code null}.
      * @return Job execution object.
      */
     CompletableFuture<JobExecution<ComputeJobDataHolder>> 
submitColocatedInternal(
             TableViewInternal table,
             Tuple key,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            JobExecutionOptions options,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg,
+            ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     );
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
index 554f38c573a..ea1654db06a 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java
@@ -17,11 +17,7 @@
 
 package org.apache.ignite.internal.compute;
 
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.deployment.DeploymentUnit;
-import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Compute job starter interface.
@@ -31,18 +27,8 @@ public interface JobStarter {
     /**
      * Start compute job.
      *
-     * @param options Compute job execution options.
-     * @param units Deployment units. Can be empty.
-     * @param jobClassName Name of the job class to execute.
-     * @param metadataBuilder Event metadata builder.
-     * @param args Arguments of the job.
+     * @param executionContext Execution context.
      * @return Future of the job execution object.
      */
-    CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> start(
-            ExecutionOptions options,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder args
-    );
+    CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> 
start(ExecutionContext executionContext);
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java
deleted file mode 100644
index d1170579fa2..00000000000
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.compute;
-
-import java.util.List;
-import org.apache.ignite.deployment.DeploymentUnit;
-import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Captures the context of a remote job execution.
- */
-class RemoteExecutionContext {
-    private final ExecutionOptions executionOptions;
-
-    private final List<DeploymentUnit> units;
-
-    private final String jobClassName;
-
-    private final ComputeEventMetadataBuilder metadataBuilder;
-
-    private final ComputeJobDataHolder arg;
-
-    RemoteExecutionContext(
-            ExecutionOptions executionOptions,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder arg
-    ) {
-        this.executionOptions = executionOptions;
-        this.units = units;
-        this.jobClassName = jobClassName;
-        this.metadataBuilder = metadataBuilder;
-        this.arg = arg;
-    }
-
-    ExecutionOptions executionOptions() {
-        return executionOptions;
-    }
-
-    List<DeploymentUnit> units() {
-        return units;
-    }
-
-    String jobClassName() {
-        return jobClassName;
-    }
-
-    ComputeEventMetadataBuilder metadataBuilder() {
-        return metadataBuilder;
-    }
-
-    @Nullable ComputeJobDataHolder arg() {
-        return arg;
-    }
-}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
index 5ea525a33ab..1e3fc0edd3a 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -46,8 +46,8 @@ import 
org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.ComputeMessageTypes;
 import org.apache.ignite.internal.compute.ComputeMessagesFactory;
 import org.apache.ignite.internal.compute.ComputeUtils;
+import org.apache.ignite.internal.compute.ExecutionContext;
 import org.apache.ignite.internal.compute.ExecutionManager;
-import org.apache.ignite.internal.compute.ExecutionOptions;
 import org.apache.ignite.internal.compute.JobStarter;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
@@ -171,31 +171,20 @@ public class ComputeMessaging {
      * Submit Compute job to execution on remote node.
      *
      * @param remoteNode The job will be executed on this node.
-     * @param options Job execution options.
-     * @param units Deployment units. Can be empty.
-     * @param jobClassName Name of the job class to execute.
-     * @param metadataBuilder Event metadata builder.
-     * @param input Arguments of the job.
+     * @param executionContext Execution context.
      * @return Job id future that will be completed when the job is submitted 
on the remote node.
      */
-    public CompletableFuture<UUID> remoteExecuteRequestAsync(
-            ClusterNode remoteNode,
-            ExecutionOptions options,
-            List<DeploymentUnit> units,
-            String jobClassName,
-            ComputeEventMetadataBuilder metadataBuilder,
-            @Nullable ComputeJobDataHolder input
-    ) {
-        List<DeploymentUnitMsg> deploymentUnitMsgs = units.stream()
+    public CompletableFuture<UUID> remoteExecuteRequestAsync(ClusterNode 
remoteNode, ExecutionContext executionContext) {
+        List<DeploymentUnitMsg> deploymentUnitMsgs = 
executionContext.units().stream()
                 .map(ComputeUtils::toDeploymentUnitMsg)
                 .collect(toList());
 
         ExecuteRequestV2 executeRequest = messagesFactory.executeRequestV2()
-                .executeOptions(options)
+                .executeOptions(executionContext.options())
                 .deploymentUnits(deploymentUnitMsgs)
-                .jobClassName(jobClassName)
-                .metadataBuilder(metadataBuilder)
-                .input(input)
+                .jobClassName(executionContext.jobClassName())
+                .metadataBuilder(executionContext.metadataBuilder())
+                .input(executionContext.arg())
                 .build();
 
         return invoke(remoteNode, executeRequest)
@@ -209,7 +198,7 @@ public class ComputeMessaging {
                 ? ((ExecuteRequestV2) request).metadataBuilder()
                 : ComputeEventMetadata.builder();
 
-        starter.start(request.executeOptions(), units, request.jobClassName(), 
metadataBuilder, request.input())
+        starter.start(new ExecutionContext(request.executeOptions(), units, 
request.jobClassName(), metadataBuilder, request.input()))
                 .whenComplete((execution, err) -> {
                             if (err != null) {
                                 sendExecuteResponse(null, err, sender, 
correlationId);
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 441a51601b6..3ea5379a0d0 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -220,7 +220,8 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         CancelHandle cancelHandle = CancelHandle.create();
 
         CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> 
executionFut = computeComponent.executeLocally(
-                DEFAULT, List.of(), SimpleJob.class.getName(), 
ComputeEventMetadata.builder(), null, cancelHandle.token()
+                new ExecutionContext(DEFAULT, List.of(), 
SimpleJob.class.getName(), ComputeEventMetadata.builder(), null),
+                cancelHandle.token()
         );
 
         assertFalse(infiniteFuture.isDone());
@@ -724,13 +725,16 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     }
 
     private CompletableFuture<String> executeLocally(List<DeploymentUnit> 
units, String jobClassName) {
-        return computeComponent.executeLocally(DEFAULT, units, jobClassName, 
ComputeEventMetadata.builder(), null, null)
-                .thenCompose(ComputeComponentImplTest::unwrapResult);
+        return computeComponent.executeLocally(
+                new ExecutionContext(DEFAULT, units, jobClassName, 
ComputeEventMetadata.builder(), null),
+                null
+        ).thenCompose(ComputeComponentImplTest::unwrapResult);
     }
 
     private JobExecution<ComputeJobDataHolder> executeLocally(String 
jobClassName, @Nullable CancellationToken cancellationToken) {
         CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> 
executionFut = computeComponent.executeLocally(
-                DEFAULT, List.of(), jobClassName, 
ComputeEventMetadata.builder(), null, cancellationToken
+                new ExecutionContext(DEFAULT, List.of(), jobClassName, 
ComputeEventMetadata.builder(), null),
+                cancellationToken
         );
         assertThat(executionFut, willCompleteSuccessfully());
         return executionFut.join();
@@ -742,15 +746,20 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
             @Nullable CancellationToken cancellationToken
     ) {
         CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> 
executionFut = computeComponent.executeRemotely(
-                remoteNode, DEFAULT, List.of(), jobClassName, 
ComputeEventMetadata.builder(), arg, cancellationToken
+                remoteNode,
+                new ExecutionContext(DEFAULT, List.of(), jobClassName, 
ComputeEventMetadata.builder(), arg),
+                cancellationToken
         );
         assertThat(executionFut, willCompleteSuccessfully());
         return executionFut.join();
     }
 
     private CompletableFuture<String> executeRemotely(String jobClassName) {
-        return computeComponent.executeRemotely(remoteNode, DEFAULT, 
List.of(), jobClassName, ComputeEventMetadata.builder(), null, null)
-                .thenCompose(ComputeComponentImplTest::unwrapResult);
+        return computeComponent.executeRemotely(
+                remoteNode,
+                new ExecutionContext(DEFAULT, List.of(), jobClassName, 
ComputeEventMetadata.builder(), null),
+                null
+        ).thenCompose(ComputeComponentImplTest::unwrapResult);
     }
 
     private static CompletableFuture<String> 
unwrapResult(JobExecution<ComputeJobDataHolder> execution) {
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
index 788c71fa0cb..7f6f5479f41 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.doReturn;
@@ -42,6 +43,7 @@ import static org.mockito.Mockito.when;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.BroadcastExecution;
 import org.apache.ignite.compute.BroadcastJobTarget;
@@ -75,6 +77,7 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatcher;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Spy;
@@ -143,8 +146,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
                 willBe("jobResponse")
         );
 
-        verify(computeComponent)
-                .executeLocally(eq(ExecutionOptions.DEFAULT), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull());
+        verifyExecuteLocally(ExecutionOptions.DEFAULT);
 
         assertEquals(jobTimestamp, observableTimestampTracker.get());
     }
@@ -185,8 +187,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
                 willBe("jobResponse")
         );
 
-        verify(computeComponent)
-                .executeLocally(eq(ExecutionOptions.DEFAULT), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull());
+        verifyExecuteLocally(ExecutionOptions.DEFAULT);
 
         assertEquals(jobTimestamp, observableTimestampTracker.get());
     }
@@ -222,8 +223,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
                 willBe("jobResponse")
         );
 
-        verify(computeComponent)
-                .executeLocally(eq(expectedOptions), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), any(), any(), isNull());
+        verifyExecuteLocally(expectedOptions);
     }
 
     @Test
@@ -308,28 +308,38 @@ class IgniteComputeImplTest extends 
BaseIgniteAbstractTest {
     }
 
     private void respondWhenExecutingSimpleJobLocally(ExecutionOptions 
executionOptions) {
-        when(computeComponent.executeLocally(eq(executionOptions), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull()))
+        when(computeComponent.executeLocally(argThat(ctxEq(executionOptions)), 
isNull()))
                 
.thenReturn(completedFuture(completedExecution(SharedComputeUtils.marshalArgOrResult(
                         "jobResponse", null, jobTimestamp.longValue()), 
localNode)));
     }
 
     private void respondWhenExecutingSimpleJobLocally(ExecutionOptions 
executionOptions, CancellationToken token) {
-        when(computeComponent.executeLocally(eq(executionOptions), 
eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), eq(token)))
+        when(computeComponent.executeLocally(argThat(ctxEq(executionOptions)), 
eq(token)))
                 
.thenReturn(completedFuture(completedExecution(SharedComputeUtils.marshalArgOrResult(
                         "jobResponse", null, jobTimestamp.longValue()), 
localNode)));
     }
 
     private void respondWhenExecutingSimpleJobRemotely(ExecutionOptions 
options) {
         when(computeComponent.executeRemotelyWithFailover(
-                eq(remoteNode), any(), eq(options), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), any(), any(), isNull()
+                eq(remoteNode), any(), argThat(ctxEq(options)), isNull()
         
)).thenReturn(completedFuture(completedExecution(SharedComputeUtils.marshalArgOrResult(
                 "remoteResponse", null, jobTimestamp.longValue()), 
remoteNode)));
     }
 
+    private void verifyExecuteLocally(ExecutionOptions options) {
+        verify(computeComponent)
+                .executeLocally(argThat(ctxEq(options)), isNull());
+    }
+
     private void verifyExecuteRemotelyWithFailover(ExecutionOptions options) {
-        verify(computeComponent).executeRemotelyWithFailover(
-                eq(remoteNode), any(), eq(options), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), any(), any(), isNull()
-        );
+        verify(computeComponent)
+                .executeRemotelyWithFailover(eq(remoteNode), any(), 
argThat(ctxEq(options)), isNull());
+    }
+
+    private ArgumentMatcher<ExecutionContext> ctxEq(ExecutionOptions options) {
+        return ctx -> Objects.equals(ctx.options(), options)
+                && Objects.equals(ctx.units(), testDeploymentUnits)
+                && Objects.equals(ctx.jobClassName(), JOB_CLASS_NAME);
     }
 
     private static <R> CancellableJobExecution<R> completedExecution(@Nullable 
R result, ClusterNode node) {

Reply via email to