This is an automated email from the ASF dual-hosted git repository. apkhmv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new e2594a72ee7 IGNITE-26331 Introduce compute execution context (#6508) e2594a72ee7 is described below commit e2594a72ee70d4e40d91d25c7028b7e31fec4ea9 Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com> AuthorDate: Fri Aug 29 12:43:05 2025 +0300 IGNITE-26331 Introduce compute execution context (#6508) --- .../ClientComputeExecuteColocatedRequest.java | 17 +-- .../compute/ClientComputeExecuteRequest.java | 3 +- .../apache/ignite/client/fakes/FakeCompute.java | 31 +++--- .../ignite/internal/compute/ComputeComponent.java | 36 ++----- .../internal/compute/ComputeComponentImpl.java | 56 +++------- .../internal/compute/ComputeJobFailover.java | 38 ++----- .../ignite/internal/compute/ExecutionContext.java | 118 +++++++++++++++++++++ .../ignite/internal/compute/IgniteComputeImpl.java | 108 ++++++------------- .../internal/compute/IgniteComputeInternal.java | 24 +---- .../apache/ignite/internal/compute/JobStarter.java | 18 +--- .../internal/compute/RemoteExecutionContext.java | 72 ------------- .../compute/messaging/ComputeMessaging.java | 29 ++--- .../internal/compute/ComputeComponentImplTest.java | 23 ++-- .../internal/compute/IgniteComputeImplTest.java | 34 +++--- 14 files changed, 260 insertions(+), 347 deletions(-) diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java index a4f6926df47..f2bb27b1e57 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.client.handler.requests.compute.ClientComputeExe import static org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.sendResultAndState; import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync; import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple; +import static org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJob; import static org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackTaskId; import static org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_TASK_ID; import static org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB; @@ -31,10 +32,10 @@ import org.apache.ignite.client.handler.ClientContext; import org.apache.ignite.client.handler.NotificationSender; import org.apache.ignite.client.handler.ResponseWriter; import org.apache.ignite.compute.JobExecution; -import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker; import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.Job; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; import org.apache.ignite.internal.compute.ComputeJobDataHolder; +import org.apache.ignite.internal.compute.ExecutionContext; import org.apache.ignite.internal.compute.IgniteComputeInternal; import org.apache.ignite.internal.compute.events.ComputeEventMetadata; import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type; @@ -71,7 +72,7 @@ public class ClientComputeExecuteColocatedRequest { BitSet noValueSet = in.unpackBitSet(); byte[] tupleBytes = in.readBinary(); - Job job = ClientComputeJobUnpacker.unpackJob(in, clientContext.hasFeature(PLATFORM_COMPUTE_JOB)); + Job job = unpackJob(in, clientContext.hasFeature(PLATFORM_COMPUTE_JOB)); unpackTaskId(in, clientContext.hasFeature(COMPUTE_TASK_ID)); // Placeholder for a possible future usage return readTableAsync(tableId, tables).thenCompose(table -> readTuple(schemaId, noValueSet, tupleBytes, table, true) @@ -84,11 +85,13 @@ public class ClientComputeExecuteColocatedRequest { CompletableFuture<JobExecution<ComputeJobDataHolder>> jobExecutionFut = compute.submitColocatedInternal( table, keyTuple, - job.deploymentUnits(), - job.jobClassName(), - job.options(), - metadataBuilder, - job.arg(), + new ExecutionContext( + job.options(), + job.deploymentUnits(), + job.jobClassName(), + metadataBuilder, + job.arg() + ), null ); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java index 8c5eb87bb76..02f496299b3 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.Job; import org.apache.ignite.internal.client.proto.ClientMessagePacker; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; import org.apache.ignite.internal.compute.ComputeJobDataHolder; +import org.apache.ignite.internal.compute.ExecutionContext; import org.apache.ignite.internal.compute.IgniteComputeInternal; import org.apache.ignite.internal.compute.MarshallerProvider; import org.apache.ignite.internal.compute.events.ComputeEventMetadata; @@ -86,7 +87,7 @@ public class ClientComputeExecuteRequest { .clientAddress(clientContext.remoteAddress().toString()); CompletableFuture<JobExecution<ComputeJobDataHolder>> executionFut = compute.executeAsyncWithFailover( - candidates, job.deploymentUnits(), job.jobClassName(), job.options(), metadataBuilder, job.arg(), null + candidates, new ExecutionContext(job.options(), job.deploymentUnits(), job.jobClassName(), metadataBuilder, job.arg()), null ); sendResultAndState(executionFut, notificationSender); diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java index 934085e4736..8368b46b091 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java @@ -57,6 +57,7 @@ import org.apache.ignite.compute.task.TaskExecution; import org.apache.ignite.deployment.DeploymentUnit; import org.apache.ignite.internal.compute.ComputeJobDataHolder; import org.apache.ignite.internal.compute.ComputeUtils; +import org.apache.ignite.internal.compute.ExecutionContext; import org.apache.ignite.internal.compute.HybridTimestampProvider; import org.apache.ignite.internal.compute.IgniteComputeInternal; import org.apache.ignite.internal.compute.JobExecutionContextImpl; @@ -107,14 +108,12 @@ public class FakeCompute implements IgniteComputeInternal { @Override public CompletableFuture<JobExecution<ComputeJobDataHolder>> executeAsyncWithFailover( Set<ClusterNode> nodes, - List<DeploymentUnit> units, - String jobClassName, - JobExecutionOptions options, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, - @Nullable CancellationToken cancellationToken) { + ExecutionContext executionContext, + @Nullable CancellationToken cancellationToken + ) { + String jobClassName = executionContext.jobClassName(); if (Objects.equals(jobClassName, GET_UNITS)) { - String unitString = units.stream().map(DeploymentUnit::render).collect(Collectors.joining(",")); + String unitString = executionContext.units().stream().map(DeploymentUnit::render).collect(Collectors.joining(",")); return completedExecution(unitString); } @@ -135,7 +134,7 @@ public class FakeCompute implements IgniteComputeInternal { ComputeJob<Object, Object> job = ComputeUtils.instantiateJob(jobClass); CompletableFuture<Object> jobFut = job.executeAsync( new JobExecutionContextImpl(ignite, new AtomicBoolean(), jobClassLoader, null), - SharedComputeUtils.unmarshalArgOrResult(arg, null, null)); + SharedComputeUtils.unmarshalArgOrResult(executionContext.arg(), null, null)); return jobExecution(jobFut != null ? jobFut : nullCompletedFuture()); } @@ -154,11 +153,7 @@ public class FakeCompute implements IgniteComputeInternal { public CompletableFuture<JobExecution<ComputeJobDataHolder>> submitColocatedInternal( TableViewInternal table, Tuple key, - List<DeploymentUnit> units, - String jobClassName, - JobExecutionOptions options, - ComputeEventMetadataBuilder metadataBuilder, - ComputeJobDataHolder args, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ) { return jobExecution(future != null @@ -202,11 +197,11 @@ public class FakeCompute implements IgniteComputeInternal { return executeAsyncWithFailover( nodes, - descriptor.units(), - descriptor.jobClassName(), - descriptor.options(), - ComputeEventMetadata.builder(), - SharedComputeUtils.marshalArgOrResult(arg, null, observableTimestamp.longValue()), + new ExecutionContext( + descriptor, + ComputeEventMetadata.builder(), + SharedComputeUtils.marshalArgOrResult(arg, null, observableTimestamp.longValue()) + ), cancellationToken ).thenApply(internalExecution -> unmarshalingExecution(descriptor, internalExecution)); } else if (target instanceof ColocatedJobTarget) { diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java index 5f8091c5bb3..a8f4da83a62 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java @@ -38,20 +38,12 @@ public interface ComputeComponent extends IgniteComponent { /** * Executes a job of the given class on the current node. * - * @param options Job execution options. - * @param units Deployment units which will be loaded for execution. - * @param jobClassName Name of the job class. - * @param metadataBuilder Event metadata builder. - * @param arg Job argument. + * @param executionContext Execution context. * @param cancellationToken Cancellation token or {@code null}. * @return Future of the job execution object which will be completed when the job is submitted. */ CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> executeLocally( - ExecutionOptions options, - List<DeploymentUnit> units, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ); @@ -59,21 +51,13 @@ public interface ComputeComponent extends IgniteComponent { * Executes a job of the given class on a remote node. * * @param remoteNode Remote node name. - * @param options Job execution options. - * @param units Deployment units which will be loaded for execution. - * @param jobClassName Name of the job class. - * @param metadataBuilder Event metadata builder. - * @param arg Job argument. + * @param executionContext Execution context. * @param cancellationToken Cancellation token or {@code null}. * @return Future of the job execution object which will be completed when the job is submitted. */ CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> executeRemotely( ClusterNode remoteNode, - ExecutionOptions options, - List<DeploymentUnit> units, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ); @@ -83,22 +67,14 @@ public interface ComputeComponent extends IgniteComponent { * * @param remoteNode Remote node name. * @param nextWorkerSelector The selector that returns the next worker to execute job on. - * @param options Job execution options. - * @param units Deployment units which will be loaded for execution. - * @param jobClassName Name of the job class. - * @param metadataBuilder Event metadata builder. - * @param arg Job argument. + * @param executionContext Execution context. * @param cancellationToken Cancellation token or {@code null}. * @return Future of the job execution object which will be completed when the job is submitted. */ CompletableFuture<JobExecution<ComputeJobDataHolder>> executeRemotelyWithFailover( ClusterNode remoteNode, NextWorkerSelector nextWorkerSelector, - ExecutionOptions options, - List<DeploymentUnit> units, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ); diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java index 635beab11e8..47feca02b11 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java @@ -126,11 +126,7 @@ public class ComputeComponentImpl implements ComputeComponent, SystemViewProvide @Override public CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> executeLocally( - ExecutionOptions options, - List<DeploymentUnit> units, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ) { if (!busyLock.enterBusy()) { @@ -138,14 +134,12 @@ public class ComputeComponentImpl implements ComputeComponent, SystemViewProvide } try { - CompletableFuture<JobContext> classLoaderFut = jobContextManager.acquireClassLoader(units); + CompletableFuture<JobContext> classLoaderFut = jobContextManager.acquireClassLoader(executionContext.units()); CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> future = - mapClassLoaderExceptions(classLoaderFut, jobClassName) + mapClassLoaderExceptions(classLoaderFut, executionContext.jobClassName()) .thenApply(context -> { - JobExecutionInternal<ComputeJobDataHolder> execution = execJob( - context, options, jobClassName, metadataBuilder, arg - ); + JobExecutionInternal<ComputeJobDataHolder> execution = execJob(context, executionContext); execution.resultAsync().whenComplete((result, e) -> context.close()); inFlightFutures.registerFuture(execution.resultAsync()); @@ -222,11 +216,7 @@ public class ComputeComponentImpl implements ComputeComponent, SystemViewProvide @Override public CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> executeRemotely( ClusterNode remoteNode, - ExecutionOptions options, - List<DeploymentUnit> units, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ) { if (!busyLock.enterBusy()) { @@ -234,14 +224,7 @@ public class ComputeComponentImpl implements ComputeComponent, SystemViewProvide } try { - CompletableFuture<UUID> jobIdFuture = messaging.remoteExecuteRequestAsync( - remoteNode, - options, - units, - jobClassName, - metadataBuilder, - arg - ); + CompletableFuture<UUID> jobIdFuture = messaging.remoteExecuteRequestAsync(remoteNode, executionContext); inFlightFutures.registerFuture(jobIdFuture); @@ -267,16 +250,12 @@ public class ComputeComponentImpl implements ComputeComponent, SystemViewProvide public CompletableFuture<JobExecution<ComputeJobDataHolder>> executeRemotelyWithFailover( ClusterNode remoteNode, NextWorkerSelector nextWorkerSelector, - ExecutionOptions options, - List<DeploymentUnit> units, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ) { return ComputeJobFailover.failSafeExecute( this, logicalTopologyService, topologyService, failoverExecutor, eventLog, - remoteNode, nextWorkerSelector, options, units, jobClassName, metadataBuilder, arg + remoteNode, nextWorkerSelector, executionContext ) .thenApply(execution -> { // Do not add cancel action to the underlying jobs, let the FailSafeJobExecution handle it. @@ -329,8 +308,7 @@ public class ComputeComponentImpl implements ComputeComponent, SystemViewProvide @Override public CompletableFuture<Void> startAsync(ComponentContext componentContext) { executor.start(); - messaging.start((options, units, jobClassName, metadataBuilder, arg) -> - executeLocally(options, units, jobClassName, metadataBuilder, arg, null)); + messaging.start(executionContext -> executeLocally(executionContext, null)); executionManager.start(); computeViewProvider.init(executionManager); @@ -356,15 +334,15 @@ public class ComputeComponentImpl implements ComputeComponent, SystemViewProvide return nullCompletedFuture(); } - private JobExecutionInternal<ComputeJobDataHolder> execJob( - JobContext context, - ExecutionOptions options, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg - ) { + private JobExecutionInternal<ComputeJobDataHolder> execJob(JobContext context, ExecutionContext executionContext) { try { - return executor.executeJob(options, jobClassName, context.classLoader(), metadataBuilder, arg); + return executor.executeJob( + executionContext.options(), + executionContext.jobClassName(), + context.classLoader(), + executionContext.metadataBuilder(), + executionContext.arg() + ); } catch (Throwable e) { context.close(); throw e; diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java index 2646411ea8c..293667baa50 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java @@ -17,18 +17,15 @@ package org.apache.ignite.internal.compute; -import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; -import org.apache.ignite.deployment.DeploymentUnit; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; import org.apache.ignite.internal.compute.events.ComputeEventMetadata; -import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder; import org.apache.ignite.internal.compute.events.ComputeEventsFactory; import org.apache.ignite.internal.eventlog.api.EventLog; import org.apache.ignite.internal.lang.IgniteInternalException; @@ -37,7 +34,6 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.TopologyService; import org.apache.ignite.lang.ErrorGroups.Compute; import org.apache.ignite.network.ClusterNode; -import org.jetbrains.annotations.Nullable; /** * This is a helper class for {@link ComputeComponent} to handle job failures. You can think about this class as a "retryable compute job @@ -87,7 +83,7 @@ class ComputeJobFailover { /** * Context of the called job. Captures deployment units, jobClassName and arguments. */ - private final RemoteExecutionContext jobContext; + private final ExecutionContext jobContext; /** * Job id of the execution. @@ -120,11 +116,7 @@ class ComputeJobFailover { EventLog eventLog, ClusterNode workerNode, NextWorkerSelector nextWorkerSelector, - ExecutionOptions executionOptions, - List<DeploymentUnit> units, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg + ExecutionContext executionContext ) { this.computeComponent = computeComponent; this.logicalTopologyService = logicalTopologyService; @@ -135,9 +127,9 @@ class ComputeJobFailover { this.nextWorkerSelector = nextWorkerSelector; // Assign failover job id so that it is consistent for any remote job. - metadataBuilder.jobId(jobId); + executionContext.metadataBuilder().jobId(jobId); - this.jobContext = new RemoteExecutionContext(executionOptions, units, jobClassName, metadataBuilder, arg); + this.jobContext = executionContext; } static CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> failSafeExecute( @@ -148,11 +140,7 @@ class ComputeJobFailover { EventLog eventLog, ClusterNode workerNode, NextWorkerSelector nextWorkerSelector, - ExecutionOptions executionOptions, - List<DeploymentUnit> units, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg + ExecutionContext executionContext ) { return new ComputeJobFailover( computeComponent, @@ -162,11 +150,7 @@ class ComputeJobFailover { eventLog, workerNode, nextWorkerSelector, - executionOptions, - units, - jobClassName, - metadataBuilder, - arg + executionContext ).execute(); } @@ -191,15 +175,9 @@ class ComputeJobFailover { private CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> launchJobOn(ClusterNode runningWorkerNode) { if (runningWorkerNode.name().equals(topologyService.localMember().name())) { - return computeComponent.executeLocally( - jobContext.executionOptions(), jobContext.units(), jobContext.jobClassName(), - jobContext.metadataBuilder(), jobContext.arg(), null - ); + return computeComponent.executeLocally(jobContext, null); } else { - return computeComponent.executeRemotely( - runningWorkerNode, jobContext.executionOptions(), jobContext.units(), jobContext.jobClassName(), - jobContext.metadataBuilder(), jobContext.arg(), null - ); + return computeComponent.executeRemotely(runningWorkerNode, jobContext, null); } } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java new file mode 100644 index 00000000000..5a97a13d26d --- /dev/null +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import java.util.List; +import org.apache.ignite.compute.JobDescriptor; +import org.apache.ignite.compute.JobExecutionOptions; +import org.apache.ignite.deployment.DeploymentUnit; +import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder; +import org.jetbrains.annotations.Nullable; + +/** + * Captures the context of a job execution. + */ +public class ExecutionContext { + private final ExecutionOptions options; + + private final List<DeploymentUnit> units; + + private final String jobClassName; + + private final ComputeEventMetadataBuilder metadataBuilder; + + private final ComputeJobDataHolder arg; + + /** + * Creates new execution context. + * + * @param options Job execution options. + * @param units Deployment units which will be loaded for execution. + * @param jobClassName Name of the job class. + * @param metadataBuilder Event metadata builder. + * @param arg Job argument. + */ + public ExecutionContext( + ExecutionOptions options, + List<DeploymentUnit> units, + String jobClassName, + ComputeEventMetadataBuilder metadataBuilder, + @Nullable ComputeJobDataHolder arg + ) { + this.options = options; + this.units = units; + this.jobClassName = jobClassName; + this.metadataBuilder = metadataBuilder; + this.arg = arg; + } + + /** + * Creates new execution context. + * + * @param jobExecutionOptions Job execution options. + * @param units Deployment units which will be loaded for execution. + * @param jobClassName Name of the job class. + * @param metadataBuilder Event metadata builder. + * @param arg Job argument. + */ + public ExecutionContext( + JobExecutionOptions jobExecutionOptions, + List<DeploymentUnit> units, + String jobClassName, + ComputeEventMetadataBuilder metadataBuilder, + @Nullable ComputeJobDataHolder arg + ) { + this(ExecutionOptions.from(jobExecutionOptions), units, jobClassName, metadataBuilder, arg); + } + + /** + * Creates new execution context. Takes execution options, deployment units and job class name from a job descriptor. + * + * @param descriptor Job descriptor. + * @param metadataBuilder Event metadata builder. + * @param arg Job argument. + */ + public <T, R> ExecutionContext( + JobDescriptor<T, R> descriptor, + ComputeEventMetadataBuilder metadataBuilder, + @Nullable ComputeJobDataHolder arg + ) { + this(descriptor.options(), descriptor.units(), descriptor.jobClassName(), metadataBuilder, arg); + } + + public ExecutionOptions options() { + return options; + } + + public List<DeploymentUnit> units() { + return units; + } + + public String jobClassName() { + return jobClassName; + } + + public ComputeEventMetadataBuilder metadataBuilder() { + return metadataBuilder; + } + + @Nullable + public ComputeJobDataHolder arg() { + return arg; + } +} diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java index cdb08ae8949..a54f82f6de0 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java @@ -163,15 +163,13 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive Objects.requireNonNull(descriptor); ComputeJobDataHolder argHolder = SharedComputeUtils.marshalArgOrResult(arg, descriptor.argumentMarshaller()); + ExecutionContext executionContext = new ExecutionContext(descriptor, metadataBuilder, argHolder); if (target instanceof AnyNodeJobTarget) { Set<ClusterNode> nodes = ((AnyNodeJobTarget) target).nodes(); return unmarshalResult( - executeAsyncWithFailover( - nodes, descriptor.units(), descriptor.jobClassName(), descriptor.options(), metadataBuilder, - argHolder, cancellationToken - ), + executeAsyncWithFailover(nodes, executionContext, cancellationToken), descriptor, observableTimestampTracker ); @@ -183,7 +181,7 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive QualifiedName tableName = colocatedTarget.tableName(); Object key = colocatedTarget.key(); - metadataBuilder.tableName(tableName.toCanonicalForm()); + executionContext.metadataBuilder().tableName(tableName.toCanonicalForm()); CompletableFuture<JobExecution<ComputeJobDataHolder>> jobFut; if (mapper != null) { @@ -200,11 +198,7 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive key, mapper ), - descriptor.units(), - descriptor.jobClassName(), - descriptor.options(), - metadataBuilder, - argHolder, + executionContext, cancellationToken ))); @@ -213,11 +207,7 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive .thenCompose(table -> submitColocatedInternal( table, (Tuple) key, - descriptor.units(), - descriptor.jobClassName(), - descriptor.options(), - metadataBuilder, - argHolder, + executionContext, cancellationToken )); } @@ -360,11 +350,7 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive executeOnOneNodeWithFailover( node, nextWorkerSelector, - descriptor.units(), - descriptor.jobClassName(), - options, - metadataBuilder, - argHolder, + new ExecutionContext(options, descriptor.units(), descriptor.jobClassName(), metadataBuilder, argHolder), cancellationToken ), descriptor, @@ -408,11 +394,7 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive @Override public CompletableFuture<JobExecution<ComputeJobDataHolder>> executeAsyncWithFailover( Set<ClusterNode> nodes, - List<DeploymentUnit> units, - String jobClassName, - JobExecutionOptions options, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ) { Set<ClusterNode> candidates = new HashSet<>(); @@ -431,16 +413,7 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive NextWorkerSelector selector = new DeqNextWorkerSelector(new ConcurrentLinkedDeque<>(candidates)); - return executeOnOneNodeWithFailover( - targetNode, - selector, - units, - jobClassName, - options, - metadataBuilder, - arg, - cancellationToken - ); + return executeOnOneNodeWithFailover(targetNode, selector, executionContext, cancellationToken); } private static ClusterNode randomNode(Set<ClusterNode> nodes) { @@ -457,41 +430,26 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive private CompletableFuture<JobExecution<ComputeJobDataHolder>> executeOnOneNodeWithFailover( ClusterNode targetNode, NextWorkerSelector nextWorkerSelector, - List<DeploymentUnit> units, - String jobClassName, - JobExecutionOptions jobExecutionOptions, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ) { - ExecutionOptions options = ExecutionOptions.from(jobExecutionOptions); - - return executeOnOneNodeWithFailover( - targetNode, nextWorkerSelector, units, jobClassName, options, metadataBuilder, - arg, cancellationToken - ); + return convertToComputeFuture( + executeOnOneNodeWithFailoverInternal(targetNode, nextWorkerSelector, executionContext, cancellationToken) + ).thenApply(JobExecutionWrapper::new); } - private CompletableFuture<JobExecution<ComputeJobDataHolder>> executeOnOneNodeWithFailover( + private CompletableFuture<? extends JobExecution<ComputeJobDataHolder>> executeOnOneNodeWithFailoverInternal( ClusterNode targetNode, NextWorkerSelector nextWorkerSelector, - List<DeploymentUnit> units, - String jobClassName, - ExecutionOptions options, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ) { - metadataBuilder.initiatorNode(nodeName); + executionContext.metadataBuilder().initiatorNode(nodeName); if (isLocal(targetNode)) { - return convertToComputeFuture(computeComponent.executeLocally( - options, units, jobClassName, metadataBuilder, arg, cancellationToken - )).thenApply(JobExecutionWrapper::new); + return computeComponent.executeLocally(executionContext, cancellationToken); } else { - return convertToComputeFuture(computeComponent.executeRemotelyWithFailover( - targetNode, nextWorkerSelector, options, units, jobClassName, metadataBuilder, arg, cancellationToken - )).thenApply(JobExecutionWrapper::new); + return computeComponent.executeRemotelyWithFailover(targetNode, nextWorkerSelector, executionContext, cancellationToken); } } @@ -520,18 +478,15 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive public CompletableFuture<JobExecution<ComputeJobDataHolder>> submitColocatedInternal( TableViewInternal table, Tuple key, - List<DeploymentUnit> units, - String jobClassName, - JobExecutionOptions options, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ) { return primaryReplicaForPartitionByTupleKey(table, key) .thenCompose(primaryNode -> executeOnOneNodeWithFailover( primaryNode, new NextColocatedWorkerSelector<>(placementDriver, topologyService, clock, nodeProperties, table, key), - units, jobClassName, options, metadataBuilder, arg, cancellationToken + executionContext, + cancellationToken )); } @@ -560,7 +515,8 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive new PartitionNextWorkerSelector( placementDriver, topologyService, clock, nodeProperties, table.zoneId(), table.tableId(), partition), - units, jobClassName, options, metadataBuilder, arg, cancellationToken + new ExecutionContext(options, units, jobClassName, metadataBuilder, arg), + cancellationToken )); } @@ -709,23 +665,25 @@ public class IgniteComputeImpl implements IgniteComputeInternal, StreamerReceive byte[] payload, ClusterNode node, List<DeploymentUnit> deploymentUnits, - ReceiverExecutionOptions options) { - JobExecutionOptions jobOptions = JobExecutionOptions.builder() + ReceiverExecutionOptions options + ) { + ExecutionOptions jobOptions = ExecutionOptions.builder() .priority(options.priority()) .maxRetries(options.maxRetries()) .executorType(options.executorType()) .build(); - // Use Compute to execute receiver on the target node with failover, class loading, scheduling. - return executeAsyncWithFailover( - Set.of(node), + ExecutionContext executionContext = new ExecutionContext( + jobOptions, deploymentUnits, getReceiverJobClassName(options.executorType()), - jobOptions, ComputeEventMetadata.builder(Type.DATA_RECEIVER), - SharedComputeUtils.marshalArgOrResult(payload, null), - null - ).thenCompose(JobExecution::resultAsync) + SharedComputeUtils.marshalArgOrResult(payload, null) + ); + + // Use Compute to execute receiver on the target node with failover, class loading, scheduling. + return executeAsyncWithFailover(Set.of(node), executionContext, null) + .thenCompose(JobExecution::resultAsync) .handle((res, err) -> { if (err != null) { if (err.getCause() instanceof ComputeException) { diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java index 5702442d123..868654c19dc 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java @@ -47,21 +47,13 @@ public interface IgniteComputeInternal extends IgniteCompute { * candidate nodes. * * @param nodes Candidate nodes; In case target node left the cluster, the job will be restarted on one of them. - * @param units Deployment units. Can be empty. - * @param jobClassName Name of the job class to execute. - * @param options Job execution options. - * @param metadataBuilder Event metadata builder. - * @param arg Argument of the job. + * @param executionContext Execution context. * @param cancellationToken Cancellation token or {@code null}. * @return CompletableFuture Job result. */ CompletableFuture<JobExecution<ComputeJobDataHolder>> executeAsyncWithFailover( Set<ClusterNode> nodes, - List<DeploymentUnit> units, - String jobClassName, - JobExecutionOptions options, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ); @@ -71,22 +63,14 @@ public interface IgniteComputeInternal extends IgniteCompute { * * @param table Table whose key is used to determine the node to execute the job on. * @param key Key that identifies the node to execute the job on. - * @param units Deployment units. Can be empty. - * @param jobClassName Name of the job class to execute. - * @param options job execution options (priority, max retries). - * @param metadataBuilder Event metadata builder. - * @param arg Argument of the job. + * @param executionContext Execution context. * @param cancellationToken Cancellation token or {@code null}. * @return Job execution object. */ CompletableFuture<JobExecution<ComputeJobDataHolder>> submitColocatedInternal( TableViewInternal table, Tuple key, - List<DeploymentUnit> units, - String jobClassName, - JobExecutionOptions options, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg, + ExecutionContext executionContext, @Nullable CancellationToken cancellationToken ); diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java index 554f38c573a..ea1654db06a 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobStarter.java @@ -17,11 +17,7 @@ package org.apache.ignite.internal.compute; -import java.util.List; import java.util.concurrent.CompletableFuture; -import org.apache.ignite.deployment.DeploymentUnit; -import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder; -import org.jetbrains.annotations.Nullable; /** * Compute job starter interface. @@ -31,18 +27,8 @@ public interface JobStarter { /** * Start compute job. * - * @param options Compute job execution options. - * @param units Deployment units. Can be empty. - * @param jobClassName Name of the job class to execute. - * @param metadataBuilder Event metadata builder. - * @param args Arguments of the job. + * @param executionContext Execution context. * @return Future of the job execution object. */ - CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> start( - ExecutionOptions options, - List<DeploymentUnit> units, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder args - ); + CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> start(ExecutionContext executionContext); } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java deleted file mode 100644 index d1170579fa2..00000000000 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.compute; - -import java.util.List; -import org.apache.ignite.deployment.DeploymentUnit; -import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder; -import org.jetbrains.annotations.Nullable; - -/** - * Captures the context of a remote job execution. - */ -class RemoteExecutionContext { - private final ExecutionOptions executionOptions; - - private final List<DeploymentUnit> units; - - private final String jobClassName; - - private final ComputeEventMetadataBuilder metadataBuilder; - - private final ComputeJobDataHolder arg; - - RemoteExecutionContext( - ExecutionOptions executionOptions, - List<DeploymentUnit> units, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder arg - ) { - this.executionOptions = executionOptions; - this.units = units; - this.jobClassName = jobClassName; - this.metadataBuilder = metadataBuilder; - this.arg = arg; - } - - ExecutionOptions executionOptions() { - return executionOptions; - } - - List<DeploymentUnit> units() { - return units; - } - - String jobClassName() { - return jobClassName; - } - - ComputeEventMetadataBuilder metadataBuilder() { - return metadataBuilder; - } - - @Nullable ComputeJobDataHolder arg() { - return arg; - } -} diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java index 5ea525a33ab..1e3fc0edd3a 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java @@ -46,8 +46,8 @@ import org.apache.ignite.internal.compute.ComputeJobDataHolder; import org.apache.ignite.internal.compute.ComputeMessageTypes; import org.apache.ignite.internal.compute.ComputeMessagesFactory; import org.apache.ignite.internal.compute.ComputeUtils; +import org.apache.ignite.internal.compute.ExecutionContext; import org.apache.ignite.internal.compute.ExecutionManager; -import org.apache.ignite.internal.compute.ExecutionOptions; import org.apache.ignite.internal.compute.JobStarter; import org.apache.ignite.internal.compute.events.ComputeEventMetadata; import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder; @@ -171,31 +171,20 @@ public class ComputeMessaging { * Submit Compute job to execution on remote node. * * @param remoteNode The job will be executed on this node. - * @param options Job execution options. - * @param units Deployment units. Can be empty. - * @param jobClassName Name of the job class to execute. - * @param metadataBuilder Event metadata builder. - * @param input Arguments of the job. + * @param executionContext Execution context. * @return Job id future that will be completed when the job is submitted on the remote node. */ - public CompletableFuture<UUID> remoteExecuteRequestAsync( - ClusterNode remoteNode, - ExecutionOptions options, - List<DeploymentUnit> units, - String jobClassName, - ComputeEventMetadataBuilder metadataBuilder, - @Nullable ComputeJobDataHolder input - ) { - List<DeploymentUnitMsg> deploymentUnitMsgs = units.stream() + public CompletableFuture<UUID> remoteExecuteRequestAsync(ClusterNode remoteNode, ExecutionContext executionContext) { + List<DeploymentUnitMsg> deploymentUnitMsgs = executionContext.units().stream() .map(ComputeUtils::toDeploymentUnitMsg) .collect(toList()); ExecuteRequestV2 executeRequest = messagesFactory.executeRequestV2() - .executeOptions(options) + .executeOptions(executionContext.options()) .deploymentUnits(deploymentUnitMsgs) - .jobClassName(jobClassName) - .metadataBuilder(metadataBuilder) - .input(input) + .jobClassName(executionContext.jobClassName()) + .metadataBuilder(executionContext.metadataBuilder()) + .input(executionContext.arg()) .build(); return invoke(remoteNode, executeRequest) @@ -209,7 +198,7 @@ public class ComputeMessaging { ? ((ExecuteRequestV2) request).metadataBuilder() : ComputeEventMetadata.builder(); - starter.start(request.executeOptions(), units, request.jobClassName(), metadataBuilder, request.input()) + starter.start(new ExecutionContext(request.executeOptions(), units, request.jobClassName(), metadataBuilder, request.input())) .whenComplete((execution, err) -> { if (err != null) { sendExecuteResponse(null, err, sender, correlationId); diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java index 441a51601b6..3ea5379a0d0 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java @@ -220,7 +220,8 @@ class ComputeComponentImplTest extends BaseIgniteAbstractTest { CancelHandle cancelHandle = CancelHandle.create(); CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> executionFut = computeComponent.executeLocally( - DEFAULT, List.of(), SimpleJob.class.getName(), ComputeEventMetadata.builder(), null, cancelHandle.token() + new ExecutionContext(DEFAULT, List.of(), SimpleJob.class.getName(), ComputeEventMetadata.builder(), null), + cancelHandle.token() ); assertFalse(infiniteFuture.isDone()); @@ -724,13 +725,16 @@ class ComputeComponentImplTest extends BaseIgniteAbstractTest { } private CompletableFuture<String> executeLocally(List<DeploymentUnit> units, String jobClassName) { - return computeComponent.executeLocally(DEFAULT, units, jobClassName, ComputeEventMetadata.builder(), null, null) - .thenCompose(ComputeComponentImplTest::unwrapResult); + return computeComponent.executeLocally( + new ExecutionContext(DEFAULT, units, jobClassName, ComputeEventMetadata.builder(), null), + null + ).thenCompose(ComputeComponentImplTest::unwrapResult); } private JobExecution<ComputeJobDataHolder> executeLocally(String jobClassName, @Nullable CancellationToken cancellationToken) { CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> executionFut = computeComponent.executeLocally( - DEFAULT, List.of(), jobClassName, ComputeEventMetadata.builder(), null, cancellationToken + new ExecutionContext(DEFAULT, List.of(), jobClassName, ComputeEventMetadata.builder(), null), + cancellationToken ); assertThat(executionFut, willCompleteSuccessfully()); return executionFut.join(); @@ -742,15 +746,20 @@ class ComputeComponentImplTest extends BaseIgniteAbstractTest { @Nullable CancellationToken cancellationToken ) { CompletableFuture<CancellableJobExecution<ComputeJobDataHolder>> executionFut = computeComponent.executeRemotely( - remoteNode, DEFAULT, List.of(), jobClassName, ComputeEventMetadata.builder(), arg, cancellationToken + remoteNode, + new ExecutionContext(DEFAULT, List.of(), jobClassName, ComputeEventMetadata.builder(), arg), + cancellationToken ); assertThat(executionFut, willCompleteSuccessfully()); return executionFut.join(); } private CompletableFuture<String> executeRemotely(String jobClassName) { - return computeComponent.executeRemotely(remoteNode, DEFAULT, List.of(), jobClassName, ComputeEventMetadata.builder(), null, null) - .thenCompose(ComputeComponentImplTest::unwrapResult); + return computeComponent.executeRemotely( + remoteNode, + new ExecutionContext(DEFAULT, List.of(), jobClassName, ComputeEventMetadata.builder(), null), + null + ).thenCompose(ComputeComponentImplTest::unwrapResult); } private static CompletableFuture<String> unwrapResult(JobExecution<ComputeJobDataHolder> execution) { diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java index 788c71fa0cb..7f6f5479f41 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java @@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doReturn; @@ -42,6 +43,7 @@ import static org.mockito.Mockito.when; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.BroadcastExecution; import org.apache.ignite.compute.BroadcastJobTarget; @@ -75,6 +77,7 @@ import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentMatcher; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Spy; @@ -143,8 +146,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { willBe("jobResponse") ); - verify(computeComponent) - .executeLocally(eq(ExecutionOptions.DEFAULT), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull()); + verifyExecuteLocally(ExecutionOptions.DEFAULT); assertEquals(jobTimestamp, observableTimestampTracker.get()); } @@ -185,8 +187,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { willBe("jobResponse") ); - verify(computeComponent) - .executeLocally(eq(ExecutionOptions.DEFAULT), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull()); + verifyExecuteLocally(ExecutionOptions.DEFAULT); assertEquals(jobTimestamp, observableTimestampTracker.get()); } @@ -222,8 +223,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { willBe("jobResponse") ); - verify(computeComponent) - .executeLocally(eq(expectedOptions), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull()); + verifyExecuteLocally(expectedOptions); } @Test @@ -308,28 +308,38 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest { } private void respondWhenExecutingSimpleJobLocally(ExecutionOptions executionOptions) { - when(computeComponent.executeLocally(eq(executionOptions), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull())) + when(computeComponent.executeLocally(argThat(ctxEq(executionOptions)), isNull())) .thenReturn(completedFuture(completedExecution(SharedComputeUtils.marshalArgOrResult( "jobResponse", null, jobTimestamp.longValue()), localNode))); } private void respondWhenExecutingSimpleJobLocally(ExecutionOptions executionOptions, CancellationToken token) { - when(computeComponent.executeLocally(eq(executionOptions), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), eq(token))) + when(computeComponent.executeLocally(argThat(ctxEq(executionOptions)), eq(token))) .thenReturn(completedFuture(completedExecution(SharedComputeUtils.marshalArgOrResult( "jobResponse", null, jobTimestamp.longValue()), localNode))); } private void respondWhenExecutingSimpleJobRemotely(ExecutionOptions options) { when(computeComponent.executeRemotelyWithFailover( - eq(remoteNode), any(), eq(options), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull() + eq(remoteNode), any(), argThat(ctxEq(options)), isNull() )).thenReturn(completedFuture(completedExecution(SharedComputeUtils.marshalArgOrResult( "remoteResponse", null, jobTimestamp.longValue()), remoteNode))); } + private void verifyExecuteLocally(ExecutionOptions options) { + verify(computeComponent) + .executeLocally(argThat(ctxEq(options)), isNull()); + } + private void verifyExecuteRemotelyWithFailover(ExecutionOptions options) { - verify(computeComponent).executeRemotelyWithFailover( - eq(remoteNode), any(), eq(options), eq(testDeploymentUnits), eq(JOB_CLASS_NAME), any(), any(), isNull() - ); + verify(computeComponent) + .executeRemotelyWithFailover(eq(remoteNode), any(), argThat(ctxEq(options)), isNull()); + } + + private ArgumentMatcher<ExecutionContext> ctxEq(ExecutionOptions options) { + return ctx -> Objects.equals(ctx.options(), options) + && Objects.equals(ctx.units(), testDeploymentUnits) + && Objects.equals(ctx.jobClassName(), JOB_CLASS_NAME); } private static <R> CancellableJobExecution<R> completedExecution(@Nullable R result, ClusterNode node) {