This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d3a9ad69e IGNITE-19708 Check refcounter of unit before undeploy 
(#2219)
5d3a9ad69e is described below

commit 5d3a9ad69e356646d7c4b8e7c8315b0ab98b3e61
Author: Ivan Gagarkin <[email protected]>
AuthorDate: Tue Jun 27 19:26:54 2023 +0400

    IGNITE-19708 Check refcounter of unit before undeploy (#2219)
---
 .../internal/deployunit/DefaultNodeCallback.java   |   9 +-
 .../internal/deployunit/DeploymentManagerImpl.java |  32 ++++-
 .../deployunit/DeploymentUnitAccessor.java         |   8 +-
 .../deployunit/DeploymentUnitAccessorImpl.java     |  39 ++++--
 .../deployunit/DeploymentUnitAcquiredWaiter.java   | 103 ++++++++++++++
 .../deployunit/DeploymentUnitAccessorImplTest.java |  53 +++++++
 .../DeploymentUnitAcquiredWaiterTest.java          | 155 +++++++++++++++++++++
 .../compute/ClassLoaderExceptionsMapper.java       |   8 +-
 modules/compute/src/test/README.md                 |   6 +-
 .../compute/ClassLoaderExceptionsMapperTest.java   |   4 +-
 ...OT-src.zip => ignite-jobs-1.0-SNAPSHOT-src.zip} | Bin
 .../unit1/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar    | Bin 0 -> 1749 bytes
 .../units/unit1/1.0.0/unit1-1.0-SNAPSHOT.jar       | Bin 1749 -> 0 bytes
 .../unit1/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar    | Bin 0 -> 1724 bytes
 .../units/unit1/2.0.0/unit2-1.0-SNAPSHOT.jar       | Bin 1724 -> 0 bytes
 .../unit1/3.0.1/ignite-ut-job1-1.0-SNAPSHOT.jar    | Bin 0 -> 1749 bytes
 .../unit1/3.0.1/ignite-ut-job2-1.0-SNAPSHOT.jar    | Bin 0 -> 1724 bytes
 .../units/unit1/3.0.1/unit1-1.0-SNAPSHOT.jar       | Bin 1749 -> 0 bytes
 .../units/unit1/3.0.1/unit2-1.0-SNAPSHOT.jar       | Bin 1724 -> 0 bytes
 .../unit1/3.0.2/ignite-ut-job1-1.0-SNAPSHOT.jar    | Bin 0 -> 1749 bytes
 .../3.0.2/subdir/ignite-ut-job2-1.0-SNAPSHOT.jar   | Bin 0 -> 1724 bytes
 .../unit1/3.0.2/subdir/unit2-1.0-SNAPSHOT.jar      | Bin 1724 -> 0 bytes
 .../units/unit1/3.0.2/unit1-1.0-SNAPSHOT.jar       | Bin 1749 -> 0 bytes
 .../unit2/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar    | Bin 0 -> 1749 bytes
 .../units/unit2/1.0.0/unit1-1.0-SNAPSHOT.jar       | Bin 1749 -> 0 bytes
 .../unit2/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar    | Bin 0 -> 1724 bytes
 .../units/unit2/2.0.0/unit2-1.0-SNAPSHOT.jar       | Bin 1724 -> 0 bytes
 .../cpp/tests/client-test/compute_test.cpp         |   8 +-
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    |   4 +-
 .../internal/compute/ItComputeTestStandalone.java  |  49 ++++++-
 .../runner/app/client/ItThinClientComputeTest.java |   4 +-
 .../units/ignite-it-jobs-1.0-SNAPSHOT.jar          | Bin 0 -> 5528 bytes
 ...OT-src.zip => ignite-jobs-1.0-SNAPSHOT-src.zip} | Bin
 .../resources/units/ignite-jobs-1.0-SNAPSHOT.jar   | Bin 4773 -> 0 bytes
 .../units/ignite-ut-job1-1.0-SNAPSHOT.jar          | Bin 0 -> 1749 bytes
 .../units/ignite-ut-job2-1.0-SNAPSHOT.jar          | Bin 0 -> 1724 bytes
 .../resources/units/unit1-1.0-SNAPSHOT.jar         | Bin 1749 -> 0 bytes
 .../resources/units/unit2-1.0-SNAPSHOT.jar         | Bin 1724 -> 0 bytes
 38 files changed, 444 insertions(+), 38 deletions(-)

diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
index a84f1e858a..d657227a5e 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.version.Version;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
@@ -42,6 +43,8 @@ public class DefaultNodeCallback extends NodeEventCallback {
 
     private final FileDeployerService deployer;
 
+    private final DeploymentUnitAcquiredWaiter undeployer;
+
     private final DownloadTracker tracker;
 
     private final ClusterManagementGroupManager cmgManager;
@@ -54,6 +57,7 @@ public class DefaultNodeCallback extends NodeEventCallback {
      * @param deploymentUnitStore Deployment units store.
      * @param messaging Deployment messaging service.
      * @param deployer Deployment unit file system service.
+     * @param undeployer Deployment unit undeployer.
      * @param cmgManager Cluster management group manager.
      * @param nodeName Node consistent ID.
      */
@@ -61,6 +65,7 @@ public class DefaultNodeCallback extends NodeEventCallback {
             DeploymentUnitStore deploymentUnitStore,
             DeployMessagingService messaging,
             FileDeployerService deployer,
+            DeploymentUnitAcquiredWaiter undeployer,
             DownloadTracker tracker,
             ClusterManagementGroupManager cmgManager,
             String nodeName
@@ -68,6 +73,7 @@ public class DefaultNodeCallback extends NodeEventCallback {
         this.deploymentUnitStore = deploymentUnitStore;
         this.messaging = messaging;
         this.deployer = deployer;
+        this.undeployer = undeployer;
         this.tracker = tracker;
         this.cmgManager = cmgManager;
         this.nodeName = nodeName;
@@ -102,8 +108,7 @@ public class DefaultNodeCallback extends NodeEventCallback {
 
     @Override
     public void onObsolete(String id, Version version, List<UnitNodeStatus> 
holders) {
-        //TODO: IGNITE-19708
-        deploymentUnitStore.updateNodeStatus(nodeName, id, version, REMOVING);
+        undeployer.submitToAcquireRelease(new DeploymentUnit(id, version));
     }
 
     @Override
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 9e7fd0a99f..ba657e6104 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -22,15 +22,18 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
 
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.ignite.compute.version.Version;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -60,6 +63,11 @@ import org.jetbrains.annotations.Nullable;
 public class DeploymentManagerImpl implements IgniteDeployment {
     private static final IgniteLogger LOG = 
Loggers.forClass(DeploymentManagerImpl.class);
 
+    /**
+     * Delay for undeployer.
+     */
+    private static final Duration UNDEPLOYER_DELAY = Duration.ofSeconds(5);
+
     /**
      * Node working directory.
      */
@@ -105,6 +113,8 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
      */
     private final DeploymentUnitAccessor deploymentUnitAccessor;
 
+    private final DeploymentUnitAcquiredWaiter undeployer;
+
     private final String nodeName;
 
     private final NodeEventCallback nodeStatusCallback;
@@ -138,14 +148,26 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
         this.configuration = configuration;
         this.cmgManager = cmgManager;
         this.workDir = workDir;
+        this.nodeName = nodeName;
         tracker = new DownloadTracker();
         deployer = new FileDeployerService();
-        messaging = new DeployMessagingService(clusterService, cmgManager, 
deployer, tracker);
         deploymentUnitAccessor = new DeploymentUnitAccessorImpl(deployer);
+        undeployer = new DeploymentUnitAcquiredWaiter(
+                nodeName,
+                deploymentUnitAccessor,
+                unit -> deploymentUnitStore.updateNodeStatus(nodeName, 
unit.name(), unit.version(), REMOVING)
+        );
+        messaging = new DeployMessagingService(clusterService, cmgManager, 
deployer, tracker);
 
-        this.nodeName = nodeName;
-
-        nodeStatusCallback = new DefaultNodeCallback(deploymentUnitStore, 
messaging, deployer, tracker, cmgManager, nodeName);
+        nodeStatusCallback = new DefaultNodeCallback(
+                deploymentUnitStore,
+                messaging,
+                deployer,
+                undeployer,
+                tracker,
+                cmgManager,
+                nodeName
+        );
         nodeStatusWatchListener = new 
NodeStatusWatchListener(deploymentUnitStore, nodeName, nodeStatusCallback);
 
         clusterEventCallback = new 
ClusterEventCallbackImpl(deploymentUnitStore, deployer, cmgManager, nodeName);
@@ -373,6 +395,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
         
deploymentUnitStore.registerClusterStatusListener(clusterStatusWatchListener);
         messaging.subscribe();
         failover.registerTopologyChangeCallback(nodeStatusCallback, 
clusterEventCallback);
+        undeployer.start(UNDEPLOYER_DELAY.getSeconds(), TimeUnit.SECONDS);
     }
 
     @Override
@@ -381,6 +404,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
         tracker.cancelAll();
         
deploymentUnitStore.unregisterNodeStatusListener(nodeStatusWatchListener);
         
deploymentUnitStore.unregisterClusterStatusListener(clusterStatusWatchListener);
+        undeployer.stop();
     }
 
     private static void checkId(String id) {
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessor.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessor.java
index feae04bc88..cf384d391a 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessor.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.deployunit;
 
+import java.util.function.Consumer;
 import org.apache.ignite.compute.DeploymentUnit;
 
 /**
@@ -33,10 +34,11 @@ public interface DeploymentUnitAccessor {
     DisposableDeploymentUnit acquire(DeploymentUnit unit);
 
     /**
-     * Checks if the deployment unit is acquired.
+     * Executes the consumer if the deployment unit is not acquired.
      *
      * @param unit Deployment unit.
-     * @return {@code true} if the deployment unit is acquired.
+     * @param consumer Consumer.
+     * @return {@code true} if the consumer was executed.
      */
-    boolean isAcquired(DeploymentUnit unit);
+    boolean computeIfNotAcquired(DeploymentUnit unit, Consumer<DeploymentUnit> 
consumer);
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImpl.java
index 0078c84e94..368bcce408 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImpl.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.deployunit;
 
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.internal.util.RefCountedObjectPool;
 
@@ -26,6 +30,8 @@ import org.apache.ignite.internal.util.RefCountedObjectPool;
 public class DeploymentUnitAccessorImpl implements DeploymentUnitAccessor {
     private final RefCountedObjectPool<DeploymentUnit, 
DisposableDeploymentUnit> pool = new RefCountedObjectPool<>();
 
+    private final RefCountedObjectPool<DeploymentUnit, Lock> locks = new 
RefCountedObjectPool<>();
+
     private final FileDeployerService deployer;
 
     public DeploymentUnitAccessorImpl(FileDeployerService deployer) {
@@ -37,19 +43,36 @@ public class DeploymentUnitAccessorImpl implements 
DeploymentUnitAccessor {
      */
     @Override
     public DisposableDeploymentUnit acquire(DeploymentUnit unit) {
-        return pool.acquire(unit, ignored -> new DisposableDeploymentUnit(
-                        unit,
-                        deployer.unitPath(unit.name(), unit.version(), true),
-                        () -> pool.release(unit)
-                )
-        );
+        return executeWithLock(unit, it -> pool.acquire(it, ignored -> new 
DisposableDeploymentUnit(
+                it,
+                deployer.unitPath(it.name(), it.version(), true),
+                () -> pool.release(it)
+        )));
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public boolean isAcquired(DeploymentUnit unit) {
-        return pool.isAcquired(unit);
+    public boolean computeIfNotAcquired(DeploymentUnit unit, 
Consumer<DeploymentUnit> consumer) {
+        return executeWithLock(unit, it -> {
+            if (pool.isAcquired(it)) {
+                return false;
+            } else {
+                consumer.accept(it);
+                return true;
+            }
+        });
+    }
+
+    private <O> O executeWithLock(DeploymentUnit unit, 
Function<DeploymentUnit, O> function) {
+        Lock lock = locks.acquire(unit, ignored -> new ReentrantLock());
+        lock.lock();
+        try {
+            return function.apply(unit);
+        } finally {
+            lock.unlock();
+            locks.release(unit);
+        }
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiter.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiter.java
new file mode 100644
index 0000000000..b754275eea
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+
+/**
+ * Executes action on the deployment unit if it is not acquired. Otherwise, 
puts it to the queue.
+ */
+class DeploymentUnitAcquiredWaiter {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DeploymentUnitAcquiredWaiter.class);
+
+    /** Deployment units to undeploy. */
+    private final Queue<DeploymentUnit> queue = new ConcurrentLinkedDeque<>();
+
+    /** Deployment unit accessor. */
+    private final DeploymentUnitAccessor deploymentUnitAccessor;
+
+    /** Executor. */
+    private final ScheduledExecutorService executor;
+
+    /** Action. */
+    private final Consumer<DeploymentUnit> action;
+
+    /**
+     * Creates processor.
+     *
+     * @param nodeName node name.
+     * @param deploymentUnitAccessor deployment unit accessor.
+     * @param action action.
+     */
+    DeploymentUnitAcquiredWaiter(
+            String nodeName,
+            DeploymentUnitAccessor deploymentUnitAccessor,
+            Consumer<DeploymentUnit> action) {
+        this.deploymentUnitAccessor = deploymentUnitAccessor;
+        this.executor = Executors.newScheduledThreadPool(
+                1, NamedThreadFactory.create(nodeName, 
"deployment-unit-acquired-waiter", LOG));
+        this.action = action;
+    }
+
+    /**
+     * Starts the processor.
+     *
+     * @param delay delay between undeploy attempts.
+     * @param unit time unit of the delay.
+     */
+    public void start(long delay, TimeUnit unit) {
+        executor.scheduleWithFixedDelay(this::processQueue, 0, delay, unit);
+    }
+
+    /**
+     * Stops the processor.
+     */
+    public void stop() {
+        executor.shutdown();
+    }
+
+    /**
+     * Processes all deployment units in the queue.
+     */
+    private void processQueue() {
+        int size = queue.size();
+        for (int i = 0; i < size; i++) {
+            submitToAcquireRelease(queue.remove());
+        }
+    }
+
+    /**
+     * Executes the action on the deployment unit if it is not acquired. 
Otherwise, puts it to the queue.
+     *
+     * @param unit deployment unit to undeploy.
+     */
+    public void submitToAcquireRelease(DeploymentUnit unit) {
+        if (!deploymentUnitAccessor.computeIfNotAcquired(unit, action)) {
+            queue.offer(unit);
+        }
+    }
+}
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImplTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImplTest.java
new file mode 100644
index 0000000000..63d9c0b3e1
--- /dev/null
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImplTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.compute.DeploymentUnit;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DeploymentUnitAccessorImplTest {
+
+    @Mock
+    private FileDeployerService deployerService;
+
+    @InjectMocks
+    private DeploymentUnitAccessorImpl deploymentUnitAccessor;
+
+    @Test
+    void computeIfNotAcquired() {
+        DeploymentUnit unit = new DeploymentUnit("unit", "1.0.0");
+        DisposableDeploymentUnit disposableDeploymentUnit1 = 
deploymentUnitAccessor.acquire(unit);
+        DisposableDeploymentUnit disposableDeploymentUnit2 = 
deploymentUnitAccessor.acquire(unit);
+
+        assertFalse(deploymentUnitAccessor.computeIfNotAcquired(unit, ignored 
-> {}));
+
+        disposableDeploymentUnit1.release();
+        assertFalse(deploymentUnitAccessor.computeIfNotAcquired(unit, ignored 
-> {}));
+
+        disposableDeploymentUnit2.release();
+        assertTrue(deploymentUnitAccessor.computeIfNotAcquired(unit, ignored 
-> {}));
+    }
+}
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiterTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiterTest.java
new file mode 100644
index 0000000000..6622a59d4d
--- /dev/null
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiterTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static java.util.function.Predicate.isEqual;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.verify;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DeploymentUnitAcquiredWaiterTest {
+    private static final int DELAY_IN_MILLIS = 500;
+
+    private final Set<DeploymentUnit> removingUnits = new 
CopyOnWriteArraySet<>();
+
+    @Mock
+    private FileDeployerService deployerService;
+
+    @Spy
+    @InjectMocks
+    private DeploymentUnitAccessorImpl deploymentUnitAccessor;
+
+    private DeploymentUnitAcquiredWaiter undeployer;
+
+    @BeforeEach
+    void setUp() {
+        undeployer = new DeploymentUnitAcquiredWaiter(
+                "testNode",
+                deploymentUnitAccessor,
+                removingUnits::add
+        );
+        undeployer.start(DELAY_IN_MILLIS, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    void undeployNotAcquiredUnits() {
+        DeploymentUnit unit1 = new DeploymentUnit("unit1", "1.0.0");
+        DeploymentUnit unit2 = new DeploymentUnit("unit2", "1.0.0");
+        DeploymentUnit unit3 = new DeploymentUnit("unit3", "1.0.0");
+
+        undeployer.submitToAcquireRelease(unit1);
+        undeployer.submitToAcquireRelease(unit2);
+        undeployer.submitToAcquireRelease(unit3);
+
+        // check all units are removed instantly.
+        assertThat(removingUnits, contains(unit1, unit2, unit3));
+    }
+
+    @Test
+    void undeployAcquiredUnits() {
+        DeploymentUnit unit1 = new DeploymentUnit("unit1", "1.0.0");
+        DeploymentUnit unit2 = new DeploymentUnit("unit2", "1.0.0");
+        DeploymentUnit unit3 = new DeploymentUnit("unit3", "1.0.0");
+
+        // all units are acquired and will not be released in the future.
+        deploymentUnitAccessor.acquire(unit1);
+        deploymentUnitAccessor.acquire(unit2);
+        deploymentUnitAccessor.acquire(unit3);
+
+        undeployer.submitToAcquireRelease(unit1);
+        undeployer.submitToAcquireRelease(unit2);
+        undeployer.submitToAcquireRelease(unit3);
+
+        // check all units are still not removed.
+        await().during(DELAY_IN_MILLIS * 5, TimeUnit.MILLISECONDS).until(
+                () -> removingUnits.contains(unit1) && 
removingUnits.contains(unit2) && removingUnits.contains(unit3),
+                isEqual(false)
+        );
+    }
+
+    @Test
+    void undeployReleasedUnits() {
+        DeploymentUnit unit1 = new DeploymentUnit("unit1", "1.0.0");
+        DeploymentUnit unit2 = new DeploymentUnit("unit2", "1.0.0");
+        DeploymentUnit unit3 = new DeploymentUnit("unit3", "1.0.0");
+
+        // unit1 and unit3 will be released in the future.
+        DisposableDeploymentUnit deploymentUnit1 = 
deploymentUnitAccessor.acquire(unit1);
+        DisposableDeploymentUnit deploymentUnit2 = 
deploymentUnitAccessor.acquire(unit2);
+        DisposableDeploymentUnit deploymentUnit3 = 
deploymentUnitAccessor.acquire(unit3);
+
+        undeployer.submitToAcquireRelease(unit1);
+        undeployer.submitToAcquireRelease(unit2);
+        undeployer.submitToAcquireRelease(unit3);
+
+        verify(deploymentUnitAccessor, 
atLeastOnce()).computeIfNotAcquired(eq(unit1), any());
+        verify(deploymentUnitAccessor, 
atLeastOnce()).computeIfNotAcquired(eq(unit2), any());
+        verify(deploymentUnitAccessor, 
atLeastOnce()).computeIfNotAcquired(eq(unit3), any());
+
+        assertThat(removingUnits, emptyIterable());
+
+        deploymentUnit1.release();
+        deploymentUnit3.release();
+
+        // check unit1 and unit3 were removed.
+        await().timeout(DELAY_IN_MILLIS * 4, TimeUnit.MILLISECONDS).until(() 
-> removingUnits.contains(unit1));
+        await().timeout(DELAY_IN_MILLIS * 4, TimeUnit.MILLISECONDS).until(() 
-> removingUnits.contains(unit3));
+
+        // check unit2 is still not removed.
+        await().during(DELAY_IN_MILLIS * 4, TimeUnit.MILLISECONDS)
+                .until(() -> removingUnits.contains(unit2), isEqual(false));
+    }
+
+    @Test
+    void notInfinityLoop() {
+        DeploymentUnit unit1 = new DeploymentUnit("unit1", "1.0.0");
+
+        deploymentUnitAccessor.acquire(unit1);
+
+        undeployer.submitToAcquireRelease(unit1);
+
+        // check delay between attempts to undeploy the unit.
+        verify(deploymentUnitAccessor, after(DELAY_IN_MILLIS * 5).atMost(6))
+                .computeIfNotAcquired(eq(unit1), any());
+    }
+
+    @AfterEach
+    void tearDown() {
+        undeployer.stop();
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
index af861c49f0..182dcddc5d 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
@@ -24,13 +24,13 @@ import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundExc
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitUnavailableException;
 
 class ClassLoaderExceptionsMapper {
-    // <class_fqdn>. Deployment unit <deployment_unit_id_and ver> doesn’t 
exist.
-    private static final String DEPLOYMENT_UNIT_DOES_NOT_EXIST_MSG = "%s. 
Deployment unit %s:%s doesn’t exist";
+    // <class_fqdn>. Deployment unit <deployment_unit_id_and ver> doesn't 
exist.
+    private static final String DEPLOYMENT_UNIT_DOES_NOT_EXIST_MSG = "%s. 
Deployment unit %s:%s doesn't exist";
 
 
-    // <class_fqdn>. Deployment unit <deployment_unit_id> can’t be used:
+    // <class_fqdn>. Deployment unit <deployment_unit_id> can't be used:
     // [clusterStatus = <clusterDURecord.status>, nodeStatus = 
<nodeDURecord.status>].
-    private static final String DEPLOYMENT_UNIT_NOT_AVAILABLE_MSG = "%s. 
Deployment unit %s:%s can’t be used: "
+    private static final String DEPLOYMENT_UNIT_NOT_AVAILABLE_MSG = "%s. 
Deployment unit %s:%s can't be used: "
             + "[clusterStatus = %s, nodeStatus = %s]";
 
     static CompletableFuture<JobContext> mapClassLoaderExceptions(
diff --git a/modules/compute/src/test/README.md 
b/modules/compute/src/test/README.md
index 1ca0cd592a..56eda0dcfb 100644
--- a/modules/compute/src/test/README.md
+++ b/modules/compute/src/test/README.md
@@ -1,15 +1,15 @@
 # ignite-compute
 
 ## Unit tests
-[apache-ignite-1.0-SNAPSHOT-src.zip](resources%2Funits%2Fapache-ignite-1.0-SNAPSHOT-src.zip)
 contains a zip archive with a 
+[ignite-jobs-1.0-SNAPSHOT-src.zip](resources%2Funits%2Fignite-jobs-1.0-SNAPSHOT-src.zip)
 contains a zip archive with a 
 test project which was used to create jars for 
`org.apache.ignite.internal.compute.loader.JobClassLoaderFactoryTest` tests.
 
-[unit1-1.0-SNAPSHOT.jar](resources%2Funits%2Funit2%2F1.0.0%2Funit1-1.0-SNAPSHOT.jar)
 contains two classes 
+[ignite-ut-job1-1.0-SNAPSHOT.jar](resources%2Funits%2Funit1%2F1.0.0%2Fignite-ut-job1-1.0-SNAPSHOT.jar)
 contains two classes 
 which are used in tests:
 * `org.apache.ignite.internal.compute.unit1.Unit1` - `extends 
org.apache.ignite.compute.ComputeJob` and returns 1 as Integer.
 * `org.my.job.compute.unit.Job1Utility`
 
-[unit2-1.0-SNAPSHOT.jar](resources%2Funits%2Funit2%2F2.0.0%2Funit2-1.0-SNAPSHOT.jar)
 contains two classes
+[ignite-ut-job2-1.0-SNAPSHOT.jar](resources%2Funits%2Funit1%2F2.0.0%2Fignite-ut-job2-1.0-SNAPSHOT.jar)
 contains two classes
 which are used in tests:
 * `org.apache.ignite.internal.compute.unit1.Unit2` - extends 
`org.apache.ignite.compute.ComputeJob` and returns "Hello World!" as String.
 * `org.my.job.compute.unit.Job2Utility`
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapperTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapperTest.java
index b36269235f..53d2a94c3c 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapperTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapperTest.java
@@ -38,7 +38,7 @@ class ClassLoaderExceptionsMapperTest {
                 mapClassLoaderExceptions(failedFuture(toBeThrown), 
"com.example.Main"),
                 willThrow(
                         ClassNotFoundException.class,
-                        "com.example.Main. Deployment unit id:1.0.0 doesn’t 
exist"
+                        "com.example.Main. Deployment unit id:1.0.0 doesn't 
exist"
                 )
         );
     }
@@ -56,7 +56,7 @@ class ClassLoaderExceptionsMapperTest {
                 mapClassLoaderExceptions(failedFuture(toBeThrown), 
"com.example.Main"),
                 willThrow(
                         ClassNotFoundException.class,
-                        "com.example.Main. Deployment unit id:1.0.0 can’t be 
used:"
+                        "com.example.Main. Deployment unit id:1.0.0 can't be 
used:"
                                 + " [clusterStatus = OBSOLETE, nodeStatus = 
REMOVING]"
                 )
         );
diff --git 
a/modules/compute/src/test/resources/units/apache-ignite-1.0-SNAPSHOT-src.zip 
b/modules/compute/src/test/resources/units/ignite-jobs-1.0-SNAPSHOT-src.zip
similarity index 100%
rename from 
modules/compute/src/test/resources/units/apache-ignite-1.0-SNAPSHOT-src.zip
rename to 
modules/compute/src/test/resources/units/ignite-jobs-1.0-SNAPSHOT-src.zip
diff --git 
a/modules/compute/src/test/resources/units/unit1/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
 
b/modules/compute/src/test/resources/units/unit1/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..1c33f9ceec
Binary files /dev/null and 
b/modules/compute/src/test/resources/units/unit1/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
 differ
diff --git 
a/modules/compute/src/test/resources/units/unit1/1.0.0/unit1-1.0-SNAPSHOT.jar 
b/modules/compute/src/test/resources/units/unit1/1.0.0/unit1-1.0-SNAPSHOT.jar
deleted file mode 100644
index ca829eaffc..0000000000
Binary files 
a/modules/compute/src/test/resources/units/unit1/1.0.0/unit1-1.0-SNAPSHOT.jar 
and /dev/null differ
diff --git 
a/modules/compute/src/test/resources/units/unit1/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
 
b/modules/compute/src/test/resources/units/unit1/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..e4d23cf045
Binary files /dev/null and 
b/modules/compute/src/test/resources/units/unit1/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
 differ
diff --git 
a/modules/compute/src/test/resources/units/unit1/2.0.0/unit2-1.0-SNAPSHOT.jar 
b/modules/compute/src/test/resources/units/unit1/2.0.0/unit2-1.0-SNAPSHOT.jar
deleted file mode 100644
index 7925c04ef4..0000000000
Binary files 
a/modules/compute/src/test/resources/units/unit1/2.0.0/unit2-1.0-SNAPSHOT.jar 
and /dev/null differ
diff --git 
a/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job1-1.0-SNAPSHOT.jar
 
b/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job1-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..1c33f9ceec
Binary files /dev/null and 
b/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job1-1.0-SNAPSHOT.jar
 differ
diff --git 
a/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job2-1.0-SNAPSHOT.jar
 
b/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job2-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..e4d23cf045
Binary files /dev/null and 
b/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job2-1.0-SNAPSHOT.jar
 differ
diff --git 
a/modules/compute/src/test/resources/units/unit1/3.0.1/unit1-1.0-SNAPSHOT.jar 
b/modules/compute/src/test/resources/units/unit1/3.0.1/unit1-1.0-SNAPSHOT.jar
deleted file mode 100644
index ca829eaffc..0000000000
Binary files 
a/modules/compute/src/test/resources/units/unit1/3.0.1/unit1-1.0-SNAPSHOT.jar 
and /dev/null differ
diff --git 
a/modules/compute/src/test/resources/units/unit1/3.0.1/unit2-1.0-SNAPSHOT.jar 
b/modules/compute/src/test/resources/units/unit1/3.0.1/unit2-1.0-SNAPSHOT.jar
deleted file mode 100644
index 7925c04ef4..0000000000
Binary files 
a/modules/compute/src/test/resources/units/unit1/3.0.1/unit2-1.0-SNAPSHOT.jar 
and /dev/null differ
diff --git 
a/modules/compute/src/test/resources/units/unit1/3.0.2/ignite-ut-job1-1.0-SNAPSHOT.jar
 
b/modules/compute/src/test/resources/units/unit1/3.0.2/ignite-ut-job1-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..1c33f9ceec
Binary files /dev/null and 
b/modules/compute/src/test/resources/units/unit1/3.0.2/ignite-ut-job1-1.0-SNAPSHOT.jar
 differ
diff --git 
a/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/ignite-ut-job2-1.0-SNAPSHOT.jar
 
b/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/ignite-ut-job2-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..e4d23cf045
Binary files /dev/null and 
b/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/ignite-ut-job2-1.0-SNAPSHOT.jar
 differ
diff --git 
a/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/unit2-1.0-SNAPSHOT.jar
 
b/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/unit2-1.0-SNAPSHOT.jar
deleted file mode 100644
index 7925c04ef4..0000000000
Binary files 
a/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/unit2-1.0-SNAPSHOT.jar
 and /dev/null differ
diff --git 
a/modules/compute/src/test/resources/units/unit1/3.0.2/unit1-1.0-SNAPSHOT.jar 
b/modules/compute/src/test/resources/units/unit1/3.0.2/unit1-1.0-SNAPSHOT.jar
deleted file mode 100644
index ca829eaffc..0000000000
Binary files 
a/modules/compute/src/test/resources/units/unit1/3.0.2/unit1-1.0-SNAPSHOT.jar 
and /dev/null differ
diff --git 
a/modules/compute/src/test/resources/units/unit2/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
 
b/modules/compute/src/test/resources/units/unit2/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..1c33f9ceec
Binary files /dev/null and 
b/modules/compute/src/test/resources/units/unit2/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
 differ
diff --git 
a/modules/compute/src/test/resources/units/unit2/1.0.0/unit1-1.0-SNAPSHOT.jar 
b/modules/compute/src/test/resources/units/unit2/1.0.0/unit1-1.0-SNAPSHOT.jar
deleted file mode 100644
index ca829eaffc..0000000000
Binary files 
a/modules/compute/src/test/resources/units/unit2/1.0.0/unit1-1.0-SNAPSHOT.jar 
and /dev/null differ
diff --git 
a/modules/compute/src/test/resources/units/unit2/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
 
b/modules/compute/src/test/resources/units/unit2/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..e4d23cf045
Binary files /dev/null and 
b/modules/compute/src/test/resources/units/unit2/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
 differ
diff --git 
a/modules/compute/src/test/resources/units/unit2/2.0.0/unit2-1.0-SNAPSHOT.jar 
b/modules/compute/src/test/resources/units/unit2/2.0.0/unit2-1.0-SNAPSHOT.jar
deleted file mode 100644
index 7925c04ef4..0000000000
Binary files 
a/modules/compute/src/test/resources/units/unit2/2.0.0/unit2-1.0-SNAPSHOT.jar 
and /dev/null differ
diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp 
b/modules/platforms/cpp/tests/client-test/compute_test.cpp
index cff645019b..7aef5fcda3 100644
--- a/modules/platforms/cpp/tests/client-test/compute_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp
@@ -311,7 +311,7 @@ TEST_F(compute_test, unknown_unit) {
                 auto cluster_nodes = m_client.get_cluster_nodes();
                 (void) m_client.get_compute().execute(cluster_nodes, 
{{"unknown"}}, NODE_NAME_JOB, {});
             } catch (const ignite_error &e) {
-                EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment 
unit unknown:latest doesn’t exist"));
+                EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment 
unit unknown:latest doesn't exist"));
                 throw;
             }
         },
@@ -325,7 +325,7 @@ TEST_F(compute_test, execute_unknown_unit_and_version) {
                 auto cluster_nodes = m_client.get_cluster_nodes();
                 (void) m_client.get_compute().execute(cluster_nodes, 
{{"unknown", "1.2.3"}}, NODE_NAME_JOB, {});
             } catch (const ignite_error &e) {
-                EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment 
unit unknown:1.2.3 doesn’t exist"));
+                EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment 
unit unknown:1.2.3 doesn't exist"));
                 throw;
             }
         },
@@ -339,7 +339,7 @@ TEST_F(compute_test, 
execute_colocated_unknown_unit_and_version) {
                 auto comp = m_client.get_compute();
                 (void) comp.execute_colocated(TABLE_1, get_tuple(1), 
{{"unknown", "1.2.3"}}, NODE_NAME_JOB, {});
             } catch (const ignite_error &e) {
-                EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment 
unit unknown:1.2.3 doesn’t exist"));
+                EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment 
unit unknown:1.2.3 doesn't exist"));
                 throw;
             }
         },
@@ -353,7 +353,7 @@ TEST_F(compute_test, broadcast_unknown_unit_and_version) {
 
     auto &res1 = res[get_node(1)];
     ASSERT_TRUE(res1.has_error());
-    EXPECT_THAT(res1.error().what_str(), ::testing::HasSubstr("Deployment unit 
unknown:1.2.3 doesn’t exist"));
+    EXPECT_THAT(res1.error().what_str(), ::testing::HasSubstr("Deployment unit 
unknown:1.2.3 doesn't exist"));
 }
 
 TEST_F(compute_test, execute_empty_unit_name) {
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 5a610e3b27..33cce325be 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -350,7 +350,7 @@ namespace Apache.Ignite.Tests.Compute
             var ex = Assert.ThrowsAsync<IgniteException>(
                 async () => await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), deploymentUnits, NodeNameJob));
 
-            StringAssert.Contains("Deployment unit unit-latest:latest doesn’t 
exist", ex!.Message);
+            StringAssert.Contains("Deployment unit unit-latest:latest doesn't 
exist", ex!.Message);
         }
 
         [Test]
@@ -362,7 +362,7 @@ namespace Apache.Ignite.Tests.Compute
             var ex = Assert.ThrowsAsync<IgniteException>(
                 async () => await 
Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple, 
deploymentUnits, NodeNameJob));
 
-            StringAssert.Contains("Deployment unit unit-latest:latest doesn’t 
exist", ex!.Message);
+            StringAssert.Contains("Deployment unit unit-latest:latest doesn't 
exist", ex!.Message);
         }
 
         [Test]
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
index 4089cc0768..f79752c390 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
@@ -18,11 +18,14 @@
 package org.apache.ignite.internal.compute;
 
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
 import static org.apache.ignite.internal.deployunit.InitialDeployMode.MAJORITY;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.nullValue;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -49,7 +52,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
 
     @BeforeEach
     void setUp() throws IOException {
-        deployJar(node(0), unit.name(), unit.version(), 
"ignite-jobs-1.0-SNAPSHOT.jar");
+        deployJar(node(0), unit.name(), unit.version(), 
"ignite-it-jobs-1.0-SNAPSHOT.jar");
     }
 
     @Override
@@ -103,7 +106,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
                 result,
                 willThrow(
                         ClassNotFoundException.class,
-                        "org.example.ConcatJob. Deployment unit 
non-existing:1.0.0 doesn’t exist"
+                        "org.example.ConcatJob. Deployment unit 
non-existing:1.0.0 doesn't exist"
                 )
         );
     }
@@ -115,20 +118,58 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
         IgniteImpl entryNode = node(0);
 
         DeploymentUnit firstVersion = new DeploymentUnit("latest-unit", 
Version.parseVersion("1.0.0"));
-        deployJar(entryNode, firstVersion.name(), firstVersion.version(), 
"unit1-1.0-SNAPSHOT.jar");
+        deployJar(entryNode, firstVersion.name(), firstVersion.version(), 
"ignite-ut-job1-1.0-SNAPSHOT.jar");
 
         CompletableFuture<Integer> result1 = entryNode.compute()
                 .execute(Set.of(entryNode.node()), jobUnits, 
"org.my.job.compute.unit.UnitJob");
         assertThat(result1, willBe(1));
 
         DeploymentUnit secondVersion = new DeploymentUnit("latest-unit", 
Version.parseVersion("1.0.1"));
-        deployJar(entryNode, secondVersion.name(), secondVersion.version(), 
"unit2-1.0-SNAPSHOT.jar");
+        deployJar(entryNode, secondVersion.name(), secondVersion.version(), 
"ignite-ut-job2-1.0-SNAPSHOT.jar");
 
         CompletableFuture<String> result2 = entryNode.compute()
                 .execute(Set.of(entryNode.node()), jobUnits, 
"org.my.job.compute.unit.UnitJob");
         assertThat(result2, willBe("Hello World!"));
     }
 
+    @Test
+    void undeployAcquiredUnit() {
+        IgniteImpl entryNode = node(0);
+        CompletableFuture<Void> job = 
entryNode.compute().execute(Set.of(entryNode.node()), units, 
"org.example.SleepJob", 3L);
+
+        assertThat(entryNode.deployment().undeployAsync(unit.name(), 
unit.version()), willCompleteSuccessfully());
+
+        assertThat(entryNode.deployment().clusterStatusAsync(unit.name(), 
unit.version()), willBe(OBSOLETE));
+        assertThat(entryNode.deployment().nodeStatusAsync(unit.name(), 
unit.version()), willBe(OBSOLETE));
+
+        await().failFast("The unit must not be removed until the job is 
completed", () -> {
+            assertThat(entryNode.deployment().clusterStatusAsync(unit.name(), 
unit.version()), willBe(OBSOLETE));
+            assertThat(entryNode.deployment().nodeStatusAsync(unit.name(), 
unit.version()), willBe(OBSOLETE));
+        }).until(() -> job, willCompleteSuccessfully());
+
+        await().until(
+                () -> entryNode.deployment().clusterStatusAsync(unit.name(), 
unit.version()),
+                willBe(nullValue())
+        );
+    }
+
+    @Test
+    void executeJobWithObsoleteUnit() {
+        IgniteImpl entryNode = node(0);
+        CompletableFuture<Void> successJob = 
entryNode.compute().execute(Set.of(entryNode.node()), units, 
"org.example.SleepJob", 2L);
+
+        assertThat(entryNode.deployment().undeployAsync(unit.name(), 
unit.version()), willCompleteSuccessfully());
+
+        CompletableFuture<Void> failedJob = 
entryNode.compute().execute(Set.of(entryNode.node()), units, 
"org.example.SleepJob", 2L);
+
+        assertThat(failedJob, willThrow(
+                ClassNotFoundException.class,
+                "org.example.SleepJob. Deployment unit jobs:1.0.0 can't be 
used: "
+                        + "[clusterStatus = OBSOLETE, nodeStatus = OBSOLETE]")
+        );
+        assertThat(successJob, willCompleteSuccessfully());
+    }
+
     private static void deployJar(IgniteImpl node, String unitId, Version 
unitVersion, String jarName) throws IOException {
         try (InputStream jarStream = 
ItComputeTestStandalone.class.getClassLoader().getResourceAsStream("units/" + 
jarName)) {
             CompletableFuture<Boolean> deployed = 
node.deployment().deployAsync(
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 72178a5b19..6912c90e76 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -215,7 +215,7 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
                         NodeNameJob.class.getName()).join());
 
         var cause = (IgniteException) ex.getCause();
-        assertThat(cause.getMessage(), containsString("Deployment unit 
u:latest doesn’t exist"));
+        assertThat(cause.getMessage(), containsString("Deployment unit 
u:latest doesn't exist"));
 
         // TODO IGNITE-19823 DeploymentUnitNotFoundException is internal, does 
not propagate to client.
         assertEquals(INTERNAL_ERR, cause.code());
@@ -232,7 +232,7 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
                         NodeNameJob.class.getName()).join());
 
         var cause = (IgniteException) ex.getCause();
-        assertThat(cause.getMessage(), containsString("Deployment unit 
u:latest doesn’t exist"));
+        assertThat(cause.getMessage(), containsString("Deployment unit 
u:latest doesn't exist"));
 
         // TODO IGNITE-19823 DeploymentUnitNotFoundException is internal, does 
not propagate to client.
         assertEquals(INTERNAL_ERR, cause.code());
diff --git 
a/modules/runner/src/integrationTest/resources/units/ignite-it-jobs-1.0-SNAPSHOT.jar
 
b/modules/runner/src/integrationTest/resources/units/ignite-it-jobs-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..7700570180
Binary files /dev/null and 
b/modules/runner/src/integrationTest/resources/units/ignite-it-jobs-1.0-SNAPSHOT.jar
 differ
diff --git 
a/modules/runner/src/integrationTest/resources/units/apache-ignite-1.0-SNAPSHOT-src.zip
 
b/modules/runner/src/integrationTest/resources/units/ignite-jobs-1.0-SNAPSHOT-src.zip
similarity index 100%
rename from 
modules/runner/src/integrationTest/resources/units/apache-ignite-1.0-SNAPSHOT-src.zip
rename to 
modules/runner/src/integrationTest/resources/units/ignite-jobs-1.0-SNAPSHOT-src.zip
diff --git 
a/modules/runner/src/integrationTest/resources/units/ignite-jobs-1.0-SNAPSHOT.jar
 
b/modules/runner/src/integrationTest/resources/units/ignite-jobs-1.0-SNAPSHOT.jar
deleted file mode 100644
index e15cca0bf1..0000000000
Binary files 
a/modules/runner/src/integrationTest/resources/units/ignite-jobs-1.0-SNAPSHOT.jar
 and /dev/null differ
diff --git 
a/modules/runner/src/integrationTest/resources/units/ignite-ut-job1-1.0-SNAPSHOT.jar
 
b/modules/runner/src/integrationTest/resources/units/ignite-ut-job1-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..1c33f9ceec
Binary files /dev/null and 
b/modules/runner/src/integrationTest/resources/units/ignite-ut-job1-1.0-SNAPSHOT.jar
 differ
diff --git 
a/modules/runner/src/integrationTest/resources/units/ignite-ut-job2-1.0-SNAPSHOT.jar
 
b/modules/runner/src/integrationTest/resources/units/ignite-ut-job2-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..e4d23cf045
Binary files /dev/null and 
b/modules/runner/src/integrationTest/resources/units/ignite-ut-job2-1.0-SNAPSHOT.jar
 differ
diff --git 
a/modules/runner/src/integrationTest/resources/units/unit1-1.0-SNAPSHOT.jar 
b/modules/runner/src/integrationTest/resources/units/unit1-1.0-SNAPSHOT.jar
deleted file mode 100644
index ca829eaffc..0000000000
Binary files 
a/modules/runner/src/integrationTest/resources/units/unit1-1.0-SNAPSHOT.jar and 
/dev/null differ
diff --git 
a/modules/runner/src/integrationTest/resources/units/unit2-1.0-SNAPSHOT.jar 
b/modules/runner/src/integrationTest/resources/units/unit2-1.0-SNAPSHOT.jar
deleted file mode 100644
index 7925c04ef4..0000000000
Binary files 
a/modules/runner/src/integrationTest/resources/units/unit2-1.0-SNAPSHOT.jar and 
/dev/null differ

Reply via email to