This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new eb05ff2934 IGNITE-24217 Running multiple compute jobs fails (#5057)
eb05ff2934 is described below

commit eb05ff29347bc5ab63c9284cac0ff4946407ebd6
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Thu Jan 16 16:53:21 2025 +0300

    IGNITE-24217 Running multiple compute jobs fails (#5057)
---
 .../internal/deployunit/DefaultNodeCallback.java   |  52 ++--------
 .../deployunit/DeployMessagingService.java         |   6 +-
 .../internal/deployunit/DeploymentManagerImpl.java |  69 +++++++------
 .../internal/deployunit/DownloadTracker.java       |   5 +-
 .../ignite/internal/deployunit/UnitDownloader.java | 110 +++++++++++++++++++++
 .../ignite/internal/deployunit/UnitStatus.java     |   8 +-
 .../metastore/status/UnitClusterStatus.java        |   6 ++
 .../metastore/status/UnitNodeStatus.java           |   6 ++
 .../ignite/internal/compute/ItComputeBaseTest.java |  45 ++++-----
 .../internal/compute/ItComputeTestStandalone.java  |  47 ++++++++-
 10 files changed, 239 insertions(+), 115 deletions(-)

diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
index 3b9a805cbb..748490a54e 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.deployunit;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -32,79 +31,42 @@ import 
org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
 import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
 import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
 import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
 
 /**
  * Default implementation of {@link NodeEventCallback}.
  */
 public class DefaultNodeCallback extends NodeEventCallback {
-    private static final IgniteLogger LOG = 
Loggers.forClass(DefaultNodeCallback.class);
-
     private final DeploymentUnitStore deploymentUnitStore;
 
-    private final DeployMessagingService messaging;
-
-    private final FileDeployerService deployer;
-
     private final DeploymentUnitAcquiredWaiter undeployer;
 
-    private final DownloadTracker tracker;
+    private final UnitDownloader downloader;
 
     private final ClusterManagementGroupManager cmgManager;
 
-    private final String nodeName;
-
     /**
      * Constructor.
      *
      * @param deploymentUnitStore Deployment units store.
-     * @param messaging Deployment messaging service.
-     * @param deployer Deployment unit file system service.
      * @param undeployer Deployment unit undeployer.
+     * @param downloader Unit downloader.
      * @param cmgManager Cluster management group manager.
-     * @param nodeName Node consistent ID.
      */
-    public DefaultNodeCallback(
+    DefaultNodeCallback(
             DeploymentUnitStore deploymentUnitStore,
-            DeployMessagingService messaging,
-            FileDeployerService deployer,
             DeploymentUnitAcquiredWaiter undeployer,
-            DownloadTracker tracker,
-            ClusterManagementGroupManager cmgManager,
-            String nodeName
+            UnitDownloader downloader,
+            ClusterManagementGroupManager cmgManager
     ) {
         this.deploymentUnitStore = deploymentUnitStore;
-        this.messaging = messaging;
-        this.deployer = deployer;
         this.undeployer = undeployer;
-        this.tracker = tracker;
+        this.downloader = downloader;
         this.cmgManager = cmgManager;
-        this.nodeName = nodeName;
     }
 
     @Override
     public void onUploading(String id, Version version, List<UnitNodeStatus> 
holders) {
-        tracker.track(id, version,
-                () -> messaging.downloadUnitContent(id, version, new 
ArrayList<>(getDeployedNodeIds(holders)))
-                        .thenCompose(content -> {
-                            
org.apache.ignite.internal.deployunit.DeploymentUnit unit = 
UnitContent.toDeploymentUnit(content);
-                            return deployer.deploy(id, version, unit)
-                                    .whenComplete((deployed, err) -> {
-                                        try {
-                                            unit.close();
-                                        } catch (Exception e) {
-                                            LOG.error("Failed to close 
deployment unit", e);
-                                        }
-                                    });
-                        })
-                        .thenApply(deployed -> {
-                            if (deployed) {
-                                return 
deploymentUnitStore.updateNodeStatus(nodeName, id, version, DEPLOYED);
-                            }
-                            return deployed;
-                        })
-        );
+        downloader.downloadUnit(id, version, getDeployedNodeIds(holders));
     }
 
     @Override
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
index f42602043e..97d98dff11 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.deployunit;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.deployment.version.Version;
@@ -109,7 +109,7 @@ public class DeployMessagingService {
      * @param nodes Nodes where unit deployed.
      * @return Downloaded deployment unit content.
      */
-    CompletableFuture<UnitContent> downloadUnitContent(String id, Version 
version, List<String> nodes) {
+    CompletableFuture<UnitContent> downloadUnitContent(String id, Version 
version, Collection<String> nodes) {
         DownloadUnitRequest request = messageFactory.downloadUnitRequest()
                 .id(id)
                 .version(version.render())
@@ -122,7 +122,7 @@ public class DeployMessagingService {
                 .thenApply(message -> ((DownloadUnitResponse) 
message).unitContent());
     }
 
-    private ClusterNode resolveClusterNode(List<String> nodes) {
+    private ClusterNode resolveClusterNode(Collection<String> nodes) {
         return nodes.stream().map(node -> 
clusterService.topologyService().getByConsistentId(node))
                 .filter(Objects::nonNull)
                 .findAny()
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 3183e5a7c6..465ea7ef2e 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -23,7 +23,6 @@ import static 
org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
-import static 
org.apache.ignite.internal.deployunit.UnitContent.toDeploymentUnit;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
@@ -34,6 +33,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
 import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
 import org.apache.ignite.internal.deployunit.metastore.NodeStatusWatchListener;
 import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
@@ -128,6 +129,8 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
 
     private final ClusterStatusWatchListener clusterStatusWatchListener;
 
+    private final UnitDownloader unitDownloader;
+
     /**
      * Constructor.
      *
@@ -161,16 +164,9 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
                 unit -> deploymentUnitStore.updateNodeStatus(nodeName, 
unit.name(), unit.version(), REMOVING)
         );
         messaging = new DeployMessagingService(clusterService, cmgManager, 
deployer, tracker);
+        unitDownloader = new UnitDownloader(deploymentUnitStore, nodeName, 
deployer, tracker, messaging);
 
-        nodeStatusCallback = new DefaultNodeCallback(
-                deploymentUnitStore,
-                messaging,
-                deployer,
-                undeployer,
-                tracker,
-                cmgManager,
-                nodeName
-        );
+        nodeStatusCallback = new DefaultNodeCallback(deploymentUnitStore, 
undeployer, unitDownloader, cmgManager);
         nodeStatusWatchListener = new 
NodeStatusWatchListener(deploymentUnitStore, nodeName, nodeStatusCallback);
 
         clusterEventCallback = new 
ClusterEventCallbackImpl(deploymentUnitStore, deployer, cmgManager, nodeName);
@@ -353,30 +349,41 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
 
     @Override
     public CompletableFuture<Boolean> onDemandDeploy(String id, Version 
version) {
-        return deploymentUnitStore.getAllNodes(id, version)
-                .thenCompose(nodes -> {
-                    if (nodes.isEmpty()) {
+        return deploymentUnitStore.getAllNodeStatuses(id, version)
+                .thenCompose(statuses -> {
+                    if (statuses.isEmpty()) {
                         return falseCompletedFuture();
                     }
-                    if (nodes.contains(nodeName)) {
-                        return trueCompletedFuture();
-                    }
-                    return messaging.downloadUnitContent(id, version, nodes)
-                            .thenCompose(content -> {
-                                return 
deploymentUnitStore.getClusterStatus(id, version)
-                                        .thenCompose(status -> {
-                                            DeploymentUnit unit = 
toDeploymentUnit(content);
-                                            return deployToLocalNode(status, 
unit)
-                                                    .whenComplete((deployed, 
throwable) -> {
-                                                        try {
-                                                            unit.close();
-                                                        } catch (Exception e) {
-                                                            LOG.error("Error 
closing deployment unit", e);
-                                                        }
-                                                    });
-                                        });
-                            });
 
+                    Optional<UnitNodeStatus> nodeStatus = statuses.stream()
+                            .filter(status -> status.nodeId().equals(nodeName))
+                            .findFirst();
+
+                    if (nodeStatus.isPresent()) {
+                        switch (nodeStatus.get().status()) {
+                            case UPLOADING:
+                                // Wait for the upload
+                                LOG.debug("Status is UPLOADING, downloading 
the unit");
+                                return unitDownloader.downloadUnit(statuses, 
id, version);
+                            case DEPLOYED:
+                                // Unit is already deployed on the local node.
+                                LOG.debug("Status is DEPLOYED");
+                                return trueCompletedFuture();
+                            default:
+                                // Invalid status, cluster status should be 
deployed
+                                LOG.debug("Invalid status {}", 
nodeStatus.get().status());
+                                return falseCompletedFuture();
+                        }
+                    } else {
+                        // Node was not in the initial deploy list, create 
status in the UPLOADING state and wait for the upload.
+                        return deploymentUnitStore.getClusterStatus(id, 
version).thenCompose(clusterStatus ->
+                                deploymentUnitStore.createNodeStatus(nodeName, 
id, version, clusterStatus.opId(), UPLOADING)
+                                        .thenCompose(created -> {
+                                            LOG.debug("Status {}, downloading 
the unit", created ? "created" : "not created");
+                                            return 
unitDownloader.downloadUnit(statuses, id, version);
+                                        })
+                        );
+                    }
                 });
     }
 
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
index d5cf07dfe6..25ef8458a2 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
@@ -44,7 +44,10 @@ public class DownloadTracker {
      */
     public <T> CompletableFuture<T> track(String id, Version version, 
Supplier<CompletableFuture<T>> trackableAction) {
         ClusterStatusKey key = 
ClusterStatusKey.builder().id(id).version(version).build();
-        return (CompletableFuture<T>) inFlightFutures.computeIfAbsent(key, k 
-> trackableAction.get());
+        return ((CompletableFuture<T>) inFlightFutures.computeIfAbsent(key,
+                k -> trackableAction.get()
+                        .whenComplete((result, throwable) -> 
inFlightFutures.remove(key))
+        ));
     }
 
     /**
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
new file mode 100644
index 0000000000..a1c472710e
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static 
org.apache.ignite.internal.deployunit.UnitContent.toDeploymentUnit;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+
+/**
+ * Unit downloader.
+ */
+class UnitDownloader {
+    private static final IgniteLogger LOG = 
Loggers.forClass(UnitDownloader.class);
+
+    private final DeploymentUnitStore deploymentUnitStore;
+
+    private final String nodeName;
+
+    private final FileDeployerService deployer;
+
+    private final DownloadTracker tracker;
+
+    private final DeployMessagingService messaging;
+
+    UnitDownloader(
+            DeploymentUnitStore deploymentUnitStore,
+            String nodeName,
+            FileDeployerService deployer,
+            DownloadTracker tracker,
+            DeployMessagingService messaging
+    ) {
+        this.deploymentUnitStore = deploymentUnitStore;
+        this.nodeName = nodeName;
+        this.deployer = deployer;
+        this.tracker = tracker;
+        this.messaging = messaging;
+    }
+
+    /**
+     * Downloads specified unit from any node where this unit is deployed from 
the specified collection of nodes to the local node, deploys
+     * it and sets the node status to {@link DeploymentStatus#DEPLOYED}.
+     *
+     * @param statuses Collection of all node statuses for this unit.
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     */
+    CompletableFuture<Boolean> downloadUnit(Collection<UnitNodeStatus> 
statuses, String id, Version version) {
+        List<String> deployedNodes = statuses.stream()
+                .filter(status -> status.status() == DEPLOYED)
+                .map(UnitNodeStatus::nodeId)
+                .collect(Collectors.toList());
+
+        return downloadUnit(id, version, deployedNodes);
+    }
+
+    /**
+     * Downloads specified unit from any node from the specified collection of 
nodes to the local node, deploys it and sets the node status
+     * to {@link DeploymentStatus#DEPLOYED}.
+     *
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @param nodes Nodes where the unit is deployed.
+     */
+    CompletableFuture<Boolean> downloadUnit(String id, Version version, 
Collection<String> nodes) {
+        return tracker.track(id, version, () -> 
messaging.downloadUnitContent(id, version, nodes)
+                .thenCompose(content -> {
+                    DeploymentUnit unit = toDeploymentUnit(content);
+                    return deployer.deploy(id, version, unit)
+                            .whenComplete((deployed, throwable) -> {
+                                try {
+                                    unit.close();
+                                } catch (Exception e) {
+                                    LOG.error("Error closing deployment unit", 
e);
+                                }
+                            });
+                })
+                .thenCompose(deployed -> {
+                    if (deployed) {
+                        return deploymentUnitStore.updateNodeStatus(nodeName, 
id, version, DEPLOYED);
+                    }
+                    return falseCompletedFuture();
+                })
+        );
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
index ef26699e53..68a94283a9 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.deployunit;
 import java.util.Objects;
 import java.util.UUID;
 import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.tostring.S;
 
 /**
  * Unit meta data class.
@@ -122,11 +123,6 @@ public abstract class UnitStatus {
 
     @Override
     public String toString() {
-        return "UnitStatus{"
-                + "id='" + id + '\''
-                + ", version=" + version
-                + ", status=" + status
-                + ", opId=" + opId
-                + '}';
+        return S.toString(UnitStatus.class, this);
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
index cc09aa6a8e..df99f24233 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 import org.apache.ignite.deployment.version.Version;
 import org.apache.ignite.internal.deployunit.DeploymentStatus;
 import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -87,6 +88,11 @@ public class UnitClusterStatus extends UnitStatus {
         return result;
     }
 
+    @Override
+    public String toString() {
+        return S.toString(UnitClusterStatus.class, this, super.toString());
+    }
+
     /**
      * Serialize unit cluster status to byte array.
      *
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
index ace6ca68f8..2ffe8c9e30 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
@@ -27,6 +27,7 @@ import java.util.UUID;
 import org.apache.ignite.deployment.version.Version;
 import org.apache.ignite.internal.deployunit.DeploymentStatus;
 import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -77,6 +78,11 @@ public class UnitNodeStatus extends UnitStatus {
         return result;
     }
 
+    @Override
+    public String toString() {
+        return S.toString(UnitNodeStatus.class, this, super.toString());
+    }
+
     /**
      * Serialize unit node status to byte array.
      *
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 93eaa645a5..a67692a59d 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -275,7 +275,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
 
         IgniteException ex = assertThrows(IgniteException.class, () -> 
compute().execute(
                 JobTarget.node(clusterNode(entryNode)),
-                
JobDescriptor.builder(failingJobClassName()).units(units()).build(), null));
+                
JobDescriptor.builder(failingJobClass()).units(units()).build(), null));
 
         assertComputeException(ex, "JobException", "Oops");
     }
@@ -286,7 +286,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
 
         JobExecution<String> execution = submit(
                 JobTarget.node(clusterNode(entryNode)),
-                JobDescriptor.<Object, 
String>builder(failingJobClassName()).units(units()).build(),
+                
JobDescriptor.builder(failingJobClass()).units(units()).build(),
                 null
         );
 
@@ -301,7 +301,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
     void executesFailingJobOnRemoteNodes() {
         IgniteException ex = assertThrows(IgniteException.class, () -> 
compute().execute(
                 JobTarget.anyNode(clusterNode(node(1)), clusterNode(node(2))),
-                
JobDescriptor.builder(failingJobClassName()).units(units()).build(), null));
+                
JobDescriptor.builder(failingJobClass()).units(units()).build(), null));
 
         assertComputeException(ex, "JobException", "Oops");
     }
@@ -323,7 +323,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
     void executesFailingJobOnRemoteNodesAsync() {
         JobExecution<String> execution = submit(
                 JobTarget.anyNode(clusterNode(node(1)), clusterNode(node(2))),
-                JobDescriptor.<Object, 
String>builder(failingJobClassName()).units(units()).build(),
+                
JobDescriptor.builder(failingJobClass()).units(units()).build(),
                 null
         );
 
@@ -376,7 +376,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
     void broadcastsFailingJob() {
         BroadcastExecution<String> broadcastExecution = submit(
                 Set.of(clusterNode(node(0)), clusterNode(node(1)), 
clusterNode(node(2))),
-                JobDescriptor.<Object, 
String>builder(failingJobClassName()).units(units()).build(),
+                
JobDescriptor.builder(failingJobClass()).units(units()).build(),
                 null
         );
 
@@ -435,7 +435,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         var ex = assertThrows(CompletionException.class,
                 () -> compute().submitAsync(
                         JobTarget.colocated("BAD_TABLE", 
Tuple.create(Map.of("k", 1))),
-                        
JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build(),
+                        
JobDescriptor.builder(getNodeNameJobClass()).units(units()).build(),
                         null
                 ).join()
         );
@@ -568,10 +568,10 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
 
     @Test
     void submitMapReduce() {
-        List<DeploymentUnit> units = units();
-        @Nullable List<DeploymentUnit> arg = units();
         TaskExecution<Integer> taskExecution = compute().submitMapReduce(
-                TaskDescriptor.<List<DeploymentUnit>, 
Integer>builder(mapReduceTaskClassName()).units(units).build(), arg);
+                
TaskDescriptor.builder(mapReduceTaskClass()).units(units()).build(),
+                units()
+        );
 
         int sumOfNodeNamesLengths = 
CLUSTER.runningNodes().map(Ignite::name).map(String::length).reduce(Integer::sum).orElseThrow();
         assertThat(taskExecution.resultAsync(), willBe(sumOfNodeNamesLengths));
@@ -587,8 +587,9 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
     @Test
     void executeMapReduceAsync() {
         CompletableFuture<Integer> future = compute().executeMapReduceAsync(
-                TaskDescriptor.<List<DeploymentUnit>, 
Integer>builder(mapReduceTaskClassName()).units(units()).build(),
-                units());
+                
TaskDescriptor.builder(mapReduceTaskClass()).units(units()).build(),
+                units()
+        );
 
         int sumOfNodeNamesLengths = 
CLUSTER.runningNodes().map(Ignite::name).map(String::length).reduce(Integer::sum).orElseThrow();
         assertThat(future, willBe(sumOfNodeNamesLengths));
@@ -596,9 +597,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
 
     @Test
     void executeMapReduce() {
-        int result = compute().executeMapReduce(
-                TaskDescriptor.<List<DeploymentUnit>, 
Integer>builder(mapReduceTaskClassName()).units(units()).build(),
-                units());
+        int result = 
compute().executeMapReduce(TaskDescriptor.builder(mapReduceTaskClass()).units(units()).build(),
 units());
 
         int sumOfNodeNamesLengths = 
CLUSTER.runningNodes().map(Ignite::name).map(String::length).reduce(Integer::sum).orElseThrow();
         assertThat(result, is(sumOfNodeNamesLengths));
@@ -752,28 +751,20 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         ).map(Arguments::of);
     }
 
-    static String toStringJobClassName() {
-        return ToStringJob.class.getName();
-    }
-
-    private static Class<ToStringJob> toStringJobClass() {
+    static Class<ToStringJob> toStringJobClass() {
         return ToStringJob.class;
     }
 
-    private static String getNodeNameJobClassName() {
-        return GetNodeNameJob.class.getName();
-    }
-
     private static Class<GetNodeNameJob> getNodeNameJobClass() {
         return GetNodeNameJob.class;
     }
 
-    private static String failingJobClassName() {
-        return FailingJob.class.getName();
+    private static Class<FailingJob> failingJobClass() {
+        return FailingJob.class;
     }
 
-    private static String mapReduceTaskClassName() {
-        return MapReduce.class.getName();
+    private static Class<MapReduce> mapReduceTaskClass() {
+        return MapReduce.class;
     }
 
     static void assertComputeException(Exception ex, Throwable cause) {
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
index 8159eb7b74..27fe655a72 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
@@ -20,20 +20,29 @@ package org.apache.ignite.internal.compute;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
 import static org.apache.ignite.internal.deployunit.InitialDeployMode.MAJORITY;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.oneOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.JobDescriptor;
@@ -56,6 +65,11 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
 
     private final List<DeploymentUnit> units = List.of(unit);
 
+    @Override
+    protected int[] cmgMetastoreNodes() {
+        return new int[] { 0, 1 }; // Majority will be 0, 1
+    }
+
     @BeforeEach
     void deploy() throws IOException {
         deployJar(node(0), unit.name(), unit.version(), 
"ignite-integration-test-jobs-1.0-SNAPSHOT.jar");
@@ -81,6 +95,34 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
         return units;
     }
 
+    @Test
+    void executeMultipleRemoteJobs() {
+        IgniteImpl ignite = unwrapIgniteImpl(node(0));
+        CompletableFuture<Set<String>> majority = 
ignite.clusterManagementGroupManager().majority();
+        assertThat(majority, will(contains(node(0).name(), node(1).name())));
+
+        
assertThat(unwrapIgniteImpl(node(0)).deployment().nodeStatusAsync(unit.name(), 
unit.version()), willBe(DEPLOYED));
+        
assertThat(unwrapIgniteImpl(node(1)).deployment().nodeStatusAsync(unit.name(), 
unit.version()), willBe(oneOf(UPLOADING, DEPLOYED)));
+        
assertThat(unwrapIgniteImpl(node(2)).deployment().nodeStatusAsync(unit.name(), 
unit.version()), willBe(nullValue()));
+
+        // Execute concurrently on majority, unit should be in the uploading 
or deployed state at this point
+        List<CompletableFuture<String>> resultsMajority = IntStream.range(0, 
3).mapToObj(i -> compute().executeAsync(
+                JobTarget.node(clusterNode(1)),
+                
JobDescriptor.builder(toStringJobClass()).units(units()).build(),
+                42
+        )).collect(Collectors.toList());
+
+        // Execute concurrently on non-majority, unit is missing, will trigger 
on-demand deploy
+        List<CompletableFuture<String>> resultsMissing = IntStream.range(0, 
3).mapToObj(i -> compute().executeAsync(
+                JobTarget.node(clusterNode(2)),
+                
JobDescriptor.builder(toStringJobClass()).units(units()).build(),
+                42
+        )).collect(Collectors.toList());
+
+        assertThat(resultsMajority, everyItem(will(equalTo("42"))));
+        assertThat(resultsMissing, everyItem(will(equalTo("42"))));
+    }
+
     @Test
     void executesJobWithNonExistingUnit() {
         Ignite entryNode = node(0);
@@ -88,8 +130,9 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
         List<DeploymentUnit> nonExistingUnits = List.of(new 
DeploymentUnit("non-existing", "1.0.0"));
         CompletableFuture<String> result = entryNode.compute().executeAsync(
                 JobTarget.node(clusterNode(entryNode)),
-                JobDescriptor.<Object[], 
String>builder(toStringJobClassName()).units(nonExistingUnits).build(),
-                null);
+                
JobDescriptor.builder(toStringJobClass()).units(nonExistingUnits).build(),
+                null
+        );
 
         CompletionException ex0 = assertThrows(CompletionException.class, 
result::join);
 


Reply via email to