This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new cd3a977d88 IGNITE-19519 Deployment unit removal (#2194)
cd3a977d88 is described below

commit cd3a977d8814313d1913c76d293d68e07b6a10d5
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Thu Jun 22 13:21:54 2023 +0300

    IGNITE-19519 Deployment unit removal (#2194)
---
 .../internal/deployunit/DefaultNodeCallback.java   |  83 ++++++++++++-----
 .../deployunit/DeployMessagingService.java         |  38 --------
 .../internal/deployunit/DeploymentManagerImpl.java |  99 ++++++++++++--------
 .../internal/deployunit/FileDeployerService.java   |   4 +-
 .../deployunit/message/DeployUnitMessageTypes.java |  14 +--
 .../deployunit/message/UndeployUnitRequest.java    |  41 ---------
 .../deployunit/message/UndeployUnitResponse.java   |  28 ------
 ...ventCallback.java => ClusterEventCallback.java} |  30 +++----
 .../metastore/ClusterEventCallbackImpl.java        | 100 +++++++++++++++++++++
 .../metastore/ClusterStatusWatchListener.java      |  58 ++++++++++++
 .../metastore/DeploymentUnitFailover.java          |  86 ++++++++++++------
 .../deployunit/metastore/DeploymentUnitStore.java  |  47 +++++++++-
 .../metastore/DeploymentUnitStoreImpl.java         |  58 ++++++------
 .../deployunit/metastore/NodeEventCallback.java    |  23 ++---
 .../metastore/NodeStatusWatchListener.java         |  16 ++--
 .../metastore/DeploymentUnitStoreImplTest.java     |  56 +++++++++---
 .../deployment/ItDeploymentUnitFailoverTest.java   |  65 ++++++++------
 .../internal/deployment/ItDeploymentUnitTest.java  |  80 ++++++++---------
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 19 files changed, 568 insertions(+), 361 deletions(-)

diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
index 79248e49a2..a84f1e858a 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
@@ -18,62 +18,69 @@
 package org.apache.ignite.internal.deployunit;
 
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.version.Version;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
 import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
 import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
 import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
-import org.apache.ignite.network.ClusterService;
 
 /**
  * Default implementation of {@link NodeEventCallback}.
  */
-public class DefaultNodeCallback implements NodeEventCallback {
+public class DefaultNodeCallback extends NodeEventCallback {
     private final DeploymentUnitStore deploymentUnitStore;
 
     private final DeployMessagingService messaging;
 
     private final FileDeployerService deployer;
 
-    private final ClusterService clusterService;
-
     private final DownloadTracker tracker;
 
+    private final ClusterManagementGroupManager cmgManager;
+
+    private final String nodeName;
+
     /**
      * Constructor.
      *
      * @param deploymentUnitStore Deployment units store.
      * @param messaging Deployment messaging service.
      * @param deployer Deployment unit file system service.
-     * @param clusterService Cluster service.
+     * @param cmgManager Cluster management group manager.
+     * @param nodeName Node consistent ID.
      */
     public DefaultNodeCallback(
             DeploymentUnitStore deploymentUnitStore,
             DeployMessagingService messaging,
             FileDeployerService deployer,
-            ClusterService clusterService,
-            DownloadTracker tracker
+            DownloadTracker tracker,
+            ClusterManagementGroupManager cmgManager,
+            String nodeName
     ) {
         this.deploymentUnitStore = deploymentUnitStore;
         this.messaging = messaging;
         this.deployer = deployer;
-        this.clusterService = clusterService;
         this.tracker = tracker;
+        this.cmgManager = cmgManager;
+        this.nodeName = nodeName;
     }
 
     @Override
-    public void onUploading(UnitNodeStatus status, List<String> holders) {
-        tracker.track(status.id(), status.version(),
-                () -> messaging.downloadUnitContent(status.id(), 
status.version(), holders)
-                        .thenCompose(content -> deployer.deploy(status.id(), 
status.version(), content))
+    public void onUploading(String id, Version version, List<UnitNodeStatus> 
holders) {
+        tracker.track(id, version,
+                () -> messaging.downloadUnitContent(id, version, new 
ArrayList<>(getDeployedNodeIds(holders)))
+                        .thenCompose(content -> deployer.deploy(id, version, 
content))
                         .thenApply(deployed -> {
                             if (deployed) {
-                                return deploymentUnitStore.updateNodeStatus(
-                                        getLocalNodeId(),
-                                        status.id(),
-                                        status.version(),
-                                        DEPLOYED);
+                                return 
deploymentUnitStore.updateNodeStatus(nodeName, id, version, DEPLOYED);
                             }
                             return deployed;
                         })
@@ -81,18 +88,48 @@ public class DefaultNodeCallback implements 
NodeEventCallback {
     }
 
     @Override
-    public void onDeploy(UnitNodeStatus status, List<String> holders) {
-        deploymentUnitStore.getClusterStatus(status.id(), status.version())
+    public void onDeploy(String id, Version version, List<UnitNodeStatus> 
holders) {
+        Set<String> nodeIds = getDeployedNodeIds(holders);
+        deploymentUnitStore.getClusterStatus(id, version)
                 .thenApply(UnitClusterStatus::initialNodesToDeploy)
-                .thenApply(holders::containsAll)
+                .thenApply(nodeIds::containsAll)
                 .thenAccept(allRequiredDeployed -> {
                     if (allRequiredDeployed) {
-                        deploymentUnitStore.updateClusterStatus(status.id(), 
status.version(), DEPLOYED);
+                        deploymentUnitStore.updateClusterStatus(id, version, 
DEPLOYED);
+                    }
+                });
+    }
+
+    @Override
+    public void onObsolete(String id, Version version, List<UnitNodeStatus> 
holders) {
+        //TODO: IGNITE-19708
+        deploymentUnitStore.updateNodeStatus(nodeName, id, version, REMOVING);
+    }
+
+    @Override
+    public void onRemoving(String id, Version version, List<UnitNodeStatus> 
holders) {
+        cmgManager.logicalTopology()
+                .thenAccept(snapshot -> {
+                    Set<String> nodes = 
snapshot.nodes().stream().map(LogicalNode::name).collect(Collectors.toSet());
+                    boolean allRemoved = holders.stream()
+                            .filter(nodeStatus -> 
nodes.contains(nodeStatus.nodeId()))
+                            .allMatch(nodeStatus -> nodeStatus.status() == 
REMOVING);
+                    if (allRemoved) {
+                        deploymentUnitStore.updateClusterStatus(id, version, 
REMOVING);
                     }
                 });
     }
 
-    private String getLocalNodeId() {
-        return clusterService.topologyService().localMember().name();
+    /**
+     * Returns a set of node IDs where unit is deployed.
+     *
+     * @param holders List of unit node statuses.
+     * @return Set of node IDs where unit is deployed.
+     */
+    private static Set<String> getDeployedNodeIds(List<UnitNodeStatus> 
holders) {
+        return holders.stream()
+                .filter(status -> status.status() == DEPLOYED)
+                .map(UnitNodeStatus::nodeId)
+                .collect(Collectors.toSet());
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
index 1834907736..0584c5b6d0 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
@@ -30,13 +30,9 @@ import 
org.apache.ignite.internal.deployunit.message.DownloadUnitResponseImpl;
 import org.apache.ignite.internal.deployunit.message.StopDeployRequest;
 import org.apache.ignite.internal.deployunit.message.StopDeployRequestImpl;
 import org.apache.ignite.internal.deployunit.message.StopDeployResponseImpl;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.network.ChannelType;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 
 /**
@@ -95,8 +91,6 @@ public class DeployMessagingService {
                 (message, senderConsistentId, correlationId) -> {
                     if (message instanceof DownloadUnitRequest) {
                         processDownloadRequest((DownloadUnitRequest) message, 
senderConsistentId, correlationId);
-                    } else if (message instanceof UndeployUnitRequest) {
-                        processUndeployRequest((UndeployUnitRequest) message, 
senderConsistentId, correlationId);
                     } else if (message instanceof StopDeployRequest) {
                         processStopDeployRequest((StopDeployRequest) message, 
senderConsistentId, correlationId);
                     }
@@ -146,27 +140,6 @@ public class DeployMessagingService {
                         ).toArray(CompletableFuture[]::new)));
     }
 
-    /**
-     * Start undeploy process from provided node with provided id and version.
-     *
-     * @param node Cluster node.
-     * @param id Deployment unit identifier.
-     * @param version Deployment unit version.
-     * @return Future with undeploy result.
-     */
-    public CompletableFuture<Void> undeploy(ClusterNode node, String id, 
Version version) {
-        return clusterService.messagingService()
-                .invoke(node,
-                        DEPLOYMENT_CHANNEL,
-                        UndeployUnitRequestImpl.builder()
-                                .id(id)
-                                .version(version.render())
-                                .build(),
-                        Long.MAX_VALUE
-                ).thenAccept(message ->
-                        LOG.info("Undeploy unit " + id + ":" + version + " 
from node " + node + " finished"));
-    }
-
     private void processStopDeployRequest(StopDeployRequest request, String 
senderConsistentId, long correlationId) {
         tracker.cancelIfDownloading(request.id(), 
Version.parseVersion(request.version()));
         clusterService.messagingService()
@@ -174,17 +147,6 @@ public class DeployMessagingService {
 
     }
 
-    private void processUndeployRequest(UndeployUnitRequest executeRequest, 
String senderConsistentId, long correlationId) {
-        LOG.info("Start to undeploy " + executeRequest.id() + " with version " 
+ executeRequest.version() + " from "
-                + clusterService.topologyService().localMember().name());
-        deployerService.undeploy(executeRequest.id(), 
Version.parseVersion(executeRequest.version()))
-                .thenRun(() -> clusterService.messagingService()
-                        .respond(senderConsistentId,
-                                UndeployUnitResponseImpl.builder().build(),
-                                correlationId)
-                );
-    }
-
     private void processDownloadRequest(DownloadUnitRequest request, String 
senderConsistentId, long correlationId) {
         deployerService.getUnitContent(request.id(), 
Version.parseVersion(request.version()))
                 .thenApply(content -> clusterService.messagingService()
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index bfa075a318..8e6177fac7 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -22,7 +22,6 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
-import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
 
 import java.nio.file.Path;
 import java.util.HashMap;
@@ -36,6 +35,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.compute.version.Version;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder;
 import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
@@ -43,8 +43,12 @@ import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitAlreadyExis
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
 import 
org.apache.ignite.internal.deployunit.exception.InvalidNodesArgumentException;
+import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallback;
+import 
org.apache.ignite.internal.deployunit.metastore.ClusterEventCallbackImpl;
+import 
org.apache.ignite.internal.deployunit.metastore.ClusterStatusWatchListener;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitFailover;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
+import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
 import org.apache.ignite.internal.deployunit.metastore.NodeStatusWatchListener;
 import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -74,11 +78,6 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
      */
     private final ClusterManagementGroupManager cmgManager;
 
-    /**
-     * Cluster service.
-     */
-    private final ClusterService clusterService;
-
     /**
      * Deploy messaging service.
      */
@@ -109,6 +108,16 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
      */
     private final DeploymentUnitAccessor deploymentUnitAccessor;
 
+    private final String nodeName;
+
+    private final NodeEventCallback nodeStatusCallback;
+
+    private final NodeStatusWatchListener nodeStatusWatchListener;
+
+    private final ClusterEventCallback clusterEventCallback;
+
+    private final ClusterStatusWatchListener clusterStatusWatchListener;
+
     /**
      * Constructor.
      *
@@ -117,6 +126,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
      * @param workDir Node working directory.
      * @param configuration Deployment configuration.
      * @param cmgManager Cluster management group manager.
+     * @param nodeName Node consistent ID.
      */
     public DeploymentManagerImpl(
             ClusterService clusterService,
@@ -124,9 +134,9 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
             LogicalTopologyService logicalTopology,
             Path workDir,
             DeploymentConfiguration configuration,
-            ClusterManagementGroupManager cmgManager
+            ClusterManagementGroupManager cmgManager,
+            String nodeName
     ) {
-        this.clusterService = clusterService;
         this.deploymentUnitStore = deploymentUnitStore;
         this.configuration = configuration;
         this.cmgManager = cmgManager;
@@ -135,7 +145,16 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
         deployer = new FileDeployerService();
         messaging = new DeployMessagingService(clusterService, cmgManager, 
deployer, tracker);
         deploymentUnitAccessor = new DeploymentUnitAccessorImpl(deployer);
-        failover = new DeploymentUnitFailover(logicalTopology, 
deploymentUnitStore, clusterService);
+
+        this.nodeName = nodeName;
+
+        nodeStatusCallback = new DefaultNodeCallback(deploymentUnitStore, 
messaging, deployer, tracker, cmgManager, nodeName);
+        nodeStatusWatchListener = new 
NodeStatusWatchListener(deploymentUnitStore, nodeName, nodeStatusCallback);
+
+        clusterEventCallback = new 
ClusterEventCallbackImpl(deploymentUnitStore, deployer, cmgManager, nodeName);
+        clusterStatusWatchListener = new 
ClusterStatusWatchListener(clusterEventCallback);
+
+        failover = new DeploymentUnitFailover(logicalTopology, 
deploymentUnitStore, deployer, nodeName);
     }
 
     @Override
@@ -150,6 +169,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
         Objects.requireNonNull(version);
         Objects.requireNonNull(deploymentUnit);
 
+        LOG.info("Deploying {}:{} on {}", id, version, deployMode);
         return extractNodes(deployMode)
                 .thenCompose(nodesToDeploy ->
                         doDeploy(id, version, force, deploymentUnit, 
nodesToDeploy,
@@ -170,6 +190,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
         Objects.requireNonNull(version);
         Objects.requireNonNull(deploymentUnit);
 
+        LOG.info("Deploying {}:{} on {}", id, version, nodes);
         return extractNodes(nodes)
                 .thenCompose(nodesToDeploy ->
                         doDeploy(id, version, force, deploymentUnit, 
nodesToDeploy,
@@ -215,9 +236,8 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
         return deployToLocalNode(id, version, unitContent)
                 .thenApply(completed -> {
                     if (completed) {
-                        String localNodeId = getLocalNodeId();
                         nodesToDeploy.forEach(node -> {
-                            if (!node.equals(localNodeId)) {
+                            if (!node.equals(nodeName)) {
                                 deploymentUnitStore.createNodeStatus(node, id, 
version);
                             }
                         });
@@ -230,7 +250,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
         return deployer.deploy(id, version, unitContent)
                 .thenCompose(deployed -> {
                     if (deployed) {
-                        return 
deploymentUnitStore.createNodeStatus(getLocalNodeId(), id, version, DEPLOYED);
+                        return deploymentUnitStore.createNodeStatus(nodeName, 
id, version, DEPLOYED);
                     }
                     return completedFuture(false);
                 });
@@ -241,25 +261,31 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
         checkId(id);
         Objects.requireNonNull(version);
 
+        LOG.info("Undeploying {}:{}", id, version);
         return messaging.stopInProgressDeploy(id, version)
                 .thenCompose(v -> deploymentUnitStore.updateClusterStatus(id, 
version, OBSOLETE))
                 .thenCompose(success -> {
                     if (success) {
-                        //TODO: Check unit usages here. If unit used in 
compute task we cannot just remove it.
-                        return deploymentUnitStore.updateClusterStatus(id, 
version, REMOVING);
-                    }
-                    return completedFuture(false);
-                })
-                .thenCompose(success -> {
-                    if (success) {
-                        return cmgManager.logicalTopology();
+                        return cmgManager.logicalTopology()
+                                .thenCompose(logicalTopology -> {
+                                    Set<String> logicalNodes = 
logicalTopology.nodes().stream()
+                                            .map(LogicalNode::name)
+                                            .collect(Collectors.toSet());
+                                    // Set OBSOLETE status only to nodes which 
are present in the topology
+                                    return deploymentUnitStore.getAllNodes(id, 
version)
+                                            .thenCompose(nodes -> 
allOf(nodes.stream()
+                                                    
.filter(logicalNodes::contains)
+                                                    .map(node -> 
deploymentUnitStore.updateNodeStatus(node, id, version, OBSOLETE))
+                                                    
.toArray(CompletableFuture[]::new)))
+                                            .thenApply(v -> {
+                                                // Now the nodes are handling 
the OBSOLETE node statuses and when all nodes are in the
+                                                // REMOVING status, the 
cluster status will be changed to REMOVING
+                                                return true;
+                                            });
+                                });
                     }
                     return failedFuture(new 
DeploymentUnitNotFoundException(id, version));
-                }).thenCompose(logicalTopologySnapshot -> allOf(
-                        logicalTopologySnapshot.nodes().stream()
-                                .map(node -> messaging.undeploy(node, id, 
version))
-                                .toArray(CompletableFuture[]::new))
-                ).thenCompose(unused -> deploymentUnitStore.remove(id, 
version));
+                });
     }
 
     @Override
@@ -297,7 +323,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
 
     @Override
     public CompletableFuture<List<UnitStatuses>> nodeStatusesAsync() {
-        return deploymentUnitStore.getNodeStatuses(getLocalNodeId())
+        return deploymentUnitStore.getNodeStatuses(nodeName)
                 .thenApply(DeploymentManagerImpl::fromUnitStatuses);
     }
 
@@ -305,7 +331,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
     public CompletableFuture<UnitStatuses> nodeStatusesAsync(String id) {
         checkId(id);
 
-        return deploymentUnitStore.getNodeStatuses(getLocalNodeId(), id)
+        return deploymentUnitStore.getNodeStatuses(nodeName, id)
                 .thenApply(statuses -> fromUnitStatuses(id, statuses));
     }
 
@@ -314,7 +340,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
         checkId(id);
         Objects.requireNonNull(version);
 
-        return deploymentUnitStore.getNodeStatus(getLocalNodeId(), id, version)
+        return deploymentUnitStore.getNodeStatus(nodeName, id, version)
                 .thenApply(DeploymentManagerImpl::extractDeploymentStatus);
     }
 
@@ -325,7 +351,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
                     if (nodes.isEmpty()) {
                         return completedFuture(false);
                     }
-                    if (nodes.contains(getLocalNodeId())) {
+                    if (nodes.contains(nodeName)) {
                         return completedFuture(true);
                     }
                     return messaging.downloadUnitContent(id, version, nodes)
@@ -347,23 +373,18 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
 
     @Override
     public void start() {
-        DefaultNodeCallback callback = new 
DefaultNodeCallback(deploymentUnitStore, messaging, deployer, clusterService, 
tracker);
         
deployer.initUnitsFolder(workDir.resolve(configuration.deploymentLocation().value()));
-        deploymentUnitStore.registerListener(new NodeStatusWatchListener(
-                deploymentUnitStore,
-                this::getLocalNodeId,
-                callback));
+        
deploymentUnitStore.registerNodeStatusListener(nodeStatusWatchListener);
+        
deploymentUnitStore.registerClusterStatusListener(clusterStatusWatchListener);
         messaging.subscribe();
-        failover.registerTopologyChangeCallback(callback);
+        failover.registerTopologyChangeCallback(nodeStatusCallback, 
clusterEventCallback);
     }
 
     @Override
     public void stop() throws Exception {
         tracker.cancelAll();
-    }
-
-    private String getLocalNodeId() {
-        return clusterService.topologyService().localMember().name();
+        
deploymentUnitStore.unregisterNodeStatusListener(nodeStatusWatchListener);
+        
deploymentUnitStore.unregisterClusterStatusListener(clusterStatusWatchListener);
     }
 
     private static void checkId(String id) {
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
index a557b33c1c..a22d09381c 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
@@ -108,7 +108,7 @@ public class FileDeployerService {
                 IgniteUtils.deleteIfExistsThrowable(unitPath(id, version));
                 return true;
             } catch (IOException e) {
-                LOG.debug("Failed to get content for unit " + id + ":" + 
version, e);
+                LOG.debug("Failed to undeploy unit " + id + ":" + version, e);
                 return false;
             }
         }, executor);
@@ -133,7 +133,7 @@ public class FileDeployerService {
                     }
                 });
             } catch (IOException e) {
-                LOG.debug("Failed to undeploy unit " + id + ":" + version, e);
+                LOG.debug("Failed to get content for unit " + id + ":" + 
version, e);
             }
             return new UnitContent(result);
         }, executor);
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
index 43985954c2..43661fd994 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
@@ -35,23 +35,13 @@ public class DeployUnitMessageTypes {
      */
     public static final short DOWNLOAD_UNIT_RESPONSE = 1;
 
-    /**
-     * Message type for {@link UndeployUnitRequest}.
-     */
-    public static final short UNDEPLOY_UNIT_REQUEST = 2;
-
-    /**
-     * Message type for {@link UndeployUnitResponse}.
-     */
-    public static final short UNDEPLOY_UNIT_RESPONSE = 3;
-
     /**
      * Message type for {@link StopDeployRequest}.
      */
-    public static final short STOP_DEPLOY_REQUEST = 4;
+    public static final short STOP_DEPLOY_REQUEST = 2;
 
     /**
      * Message type for {@link StopDeployResponse}.
      */
-    public static final short STOP_DEPLOY_RESPONSE = 5;
+    public static final short STOP_DEPLOY_RESPONSE = 3;
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitRequest.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitRequest.java
deleted file mode 100644
index 4c4f847670..0000000000
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitRequest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.deployunit.message;
-
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-
-/**
- * Undeploy unit request.
- */
-@Transferable(DeployUnitMessageTypes.UNDEPLOY_UNIT_REQUEST)
-public interface UndeployUnitRequest extends NetworkMessage {
-    /**
-     * Returns id of deployment unit.
-     *
-     * @return id of deployment unit.
-     */
-    String id();
-
-    /**
-     * Returns version of deployment unit.
-     *
-     * @return version of deployment unit.
-     */
-    String version();
-}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
deleted file mode 100644
index 3eccbb243f..0000000000
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.deployunit.message;
-
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-
-/**
- * Undeploy unit response.
- */
-@Transferable(DeployUnitMessageTypes.UNDEPLOY_UNIT_RESPONSE)
-public interface UndeployUnitResponse extends NetworkMessage {
-}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallback.java
similarity index 65%
copy from 
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
copy to 
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallback.java
index a74c31be0c..dfedfdd653 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallback.java
@@ -17,51 +17,49 @@
 
 package org.apache.ignite.internal.deployunit.metastore;
 
-import java.util.List;
-import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
+import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
 
 /**
- * Listener of deployment unit node status changes.
+ * Listener of deployment unit cluster status changes.
  */
-public interface NodeEventCallback {
+public abstract class ClusterEventCallback {
     /**
      * Change event.
      *
      * @param status Deployment unit status.
-     * @param holders Nodes consistent id.
      */
-    default void onUpdate(UnitNodeStatus status, List<String> holders) {
+    public void onUpdate(UnitClusterStatus status) {
         switch (status.status()) {
             case UPLOADING:
-                onUploading(status, holders);
+                onUploading(status);
                 break;
             case DEPLOYED:
-                onDeploy(status, holders);
-                break;
-            case REMOVING:
-                onRemoving(status, holders);
+                onDeploy(status);
                 break;
             case OBSOLETE:
-                onObsolete(status, holders);
+                onObsolete(status);
+                break;
+            case REMOVING:
+                onRemoving(status);
                 break;
             default:
                 break;
         }
     }
 
-    default void onUploading(UnitNodeStatus status, List<String> holders) {
+    protected void onUploading(UnitClusterStatus status) {
 
     }
 
-    default void onDeploy(UnitNodeStatus status, List<String> holders) {
+    protected void onDeploy(UnitClusterStatus status) {
 
     }
 
-    default void onObsolete(UnitNodeStatus status, List<String> holders) {
+    protected void onObsolete(UnitClusterStatus status) {
 
     }
 
-    default void onRemoving(UnitNodeStatus status, List<String> holders) {
+    protected void onRemoving(UnitClusterStatus status) {
 
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
new file mode 100644
index 0000000000..b8130502e8
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.version.Version;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import org.apache.ignite.internal.deployunit.FileDeployerService;
+import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+
+/** Listener of deployment unit cluster status changes. */
+public class ClusterEventCallbackImpl extends ClusterEventCallback {
+    private final DeploymentUnitStore deploymentUnitStore;
+
+    private final FileDeployerService deployerService;
+
+    private final ClusterManagementGroupManager cmgManager;
+
+    private final String nodeName;
+
+    /**
+     * Constructor.
+     *
+     * @param deploymentUnitStore Deployment units store.
+     * @param deployerService Deployment unit file system service.
+     * @param cmgManager Cluster management group manager.
+     * @param nodeName Node consistent ID.
+     */
+    public ClusterEventCallbackImpl(
+            DeploymentUnitStore deploymentUnitStore,
+            FileDeployerService deployerService,
+            ClusterManagementGroupManager cmgManager,
+            String nodeName
+    ) {
+        this.deploymentUnitStore = deploymentUnitStore;
+        this.deployerService = deployerService;
+        this.cmgManager = cmgManager;
+        this.nodeName = nodeName;
+    }
+
+    @Override
+    public void onRemoving(UnitClusterStatus status) {
+        String id = status.id();
+        Version version = status.version();
+        // Now the deployment unit can be removed from each target node and, 
after it, remove corresponding status records.
+        deploymentUnitStore.getNodeStatus(nodeName, id, 
version).thenAccept(nodeStatus -> {
+            if (nodeStatus != null && nodeStatus.status() == REMOVING) {
+                undeploy(id, version);
+            }
+        });
+    }
+
+    private void undeploy(String id, Version version) {
+        deployerService.undeploy(id, version).thenAccept(success -> {
+            if (success) {
+                deploymentUnitStore.removeNodeStatus(nodeName, id, 
version).thenAccept(successRemove -> {
+                    if (successRemove) {
+                        removeClusterStatus(id, version);
+                    }
+                });
+            }
+        });
+    }
+
+    private void removeClusterStatus(String id, Version version) {
+        cmgManager.logicalTopology().thenAccept(logicalTopology -> {
+            Set<String> logicalNodes = logicalTopology.nodes().stream()
+                    .map(LogicalNode::name)
+                    .collect(Collectors.toSet());
+            deploymentUnitStore.getAllNodes(id, version).thenAccept(nodes -> {
+                boolean emptyTopology = nodes.stream()
+                        .filter(logicalNodes::contains)
+                        .findAny()
+                        .isEmpty();
+                if (emptyTopology) {
+                    deploymentUnitStore.removeClusterStatus(id, version);
+                }
+            });
+        });
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterStatusWatchListener.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterStatusWatchListener.java
new file mode 100644
index 0000000000..af71ee6fb0
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterStatusWatchListener.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+
+/**
+ * Cluster statuses store watch listener.
+ */
+public class ClusterStatusWatchListener implements WatchListener {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ClusterStatusWatchListener.class);
+
+    private final ClusterEventCallback clusterEventCallback;
+
+    public ClusterStatusWatchListener(ClusterEventCallback 
clusterEventCallback) {
+        this.clusterEventCallback = clusterEventCallback;
+    }
+
+    @Override
+    public CompletableFuture<Void> onUpdate(WatchEvent event) {
+        for (EntryEvent e : event.entryEvents()) {
+            byte[] value = e.newEntry().value();
+            if (value != null) {
+                UnitClusterStatus unitStatus = 
UnitClusterStatus.deserialize(value);
+                clusterEventCallback.onUpdate(unitStatus);
+            }
+        }
+        return completedFuture(null);
+    }
+
+    @Override
+    public void onError(Throwable e) {
+        LOG.warn("Failed to process metastore deployment unit event. ", e);
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
index 396124500a..37c44d76d3 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
@@ -18,14 +18,20 @@
 package org.apache.ignite.internal.deployunit.metastore;
 
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
 
 import java.util.Objects;
+import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.internal.deployunit.DeploymentStatus;
+import org.apache.ignite.internal.deployunit.FileDeployerService;
+import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 
 /**
  * Deployment unit failover.
@@ -35,54 +41,84 @@ public class DeploymentUnitFailover {
 
     private final DeploymentUnitStore deploymentUnitStore;
 
-    private final ClusterService clusterService;
+    private final FileDeployerService deployer;
+
+    private final String nodeName;
 
     /**
      * Constructor.
      *
      * @param logicalTopology Logical topology service.
      * @param deploymentUnitStore Deployment units store.
-     * @param clusterService Cluster service.
+     * @param deployer Deployment unit file system service.
+     * @param nodeName Node consistent ID.
      */
-    public DeploymentUnitFailover(
-            LogicalTopologyService logicalTopology,
-            DeploymentUnitStore deploymentUnitStore,
-            ClusterService clusterService
-    ) {
+    public DeploymentUnitFailover(LogicalTopologyService logicalTopology, 
DeploymentUnitStore deploymentUnitStore,
+            FileDeployerService deployer, String nodeName) {
         this.logicalTopology = logicalTopology;
         this.deploymentUnitStore = deploymentUnitStore;
-        this.clusterService = clusterService;
+        this.deployer = deployer;
+        this.nodeName = nodeName;
     }
 
     /**
      * Register {@link NodeEventCallback} as topology change callback.
      *
-     * @param callback Node status callback.
+     * @param nodeEventCallback Node status callback.
+     * @param clusterEventCallback Cluster status callback.
      */
-    public void registerTopologyChangeCallback(NodeEventCallback callback) {
+    public void registerTopologyChangeCallback(NodeEventCallback 
nodeEventCallback, ClusterEventCallback clusterEventCallback) {
         logicalTopology.addEventListener(new LogicalTopologyEventListener() {
             @Override
             public void onNodeJoined(LogicalNode joinedNode, 
LogicalTopologySnapshot newTopology) {
                 String consistentId = joinedNode.name();
-                if (Objects.equals(consistentId, getLocalNodeId())) {
+                if (Objects.equals(consistentId, nodeName)) {
                     deploymentUnitStore.getNodeStatuses(consistentId)
-                            .thenAccept(nodeStatuses -> 
nodeStatuses.forEach(nodeStatus -> {
-                                if (nodeStatus.status() == UPLOADING) {
-                                    
deploymentUnitStore.getClusterStatus(nodeStatus.id(), nodeStatus.version())
-                                            .thenAccept(clusterStatus -> {
-                                                if (clusterStatus.status() == 
UPLOADING || clusterStatus.status() == DEPLOYED) {
-                                                    
deploymentUnitStore.getAllNodes(nodeStatus.id(), nodeStatus.version())
-                                                            .thenAccept(nodes 
-> callback.onUploading(nodeStatus, nodes));
-                                                }
-                                            });
-                                }
-                            }));
+                            .thenAccept(nodeStatuses -> 
nodeStatuses.forEach(unitNodeStatus ->
+                                    
deploymentUnitStore.getClusterStatus(unitNodeStatus.id(), 
unitNodeStatus.version())
+                                            .thenAccept(unitClusterStatus ->
+                                                    
processStatus(unitClusterStatus, unitNodeStatus, nodeEventCallback))));
                 }
             }
         });
     }
 
-    private String getLocalNodeId() {
-        return clusterService.topologyService().localMember().name();
+    private void processStatus(UnitClusterStatus unitClusterStatus, 
UnitNodeStatus unitNodeStatus, NodeEventCallback nodeEventCallback) {
+        String id = unitNodeStatus.id();
+        Version version = unitNodeStatus.version();
+        if (unitClusterStatus == null) {
+            deployer.undeploy(id, version)
+                    .thenAccept(success -> {
+                        if (success) {
+                            deploymentUnitStore.removeNodeStatus(nodeName, id, 
version);
+                        }
+                    });
+            return;
+        }
+        DeploymentStatus clusterStatus = unitClusterStatus.status();
+        DeploymentStatus nodeStatus = unitNodeStatus.status();
+        switch (clusterStatus) {
+            case UPLOADING: // fallthrough
+            case DEPLOYED:
+                if (nodeStatus == UPLOADING) {
+                    deploymentUnitStore.getAllNodeStatuses(id, version)
+                            .thenAccept(nodes -> 
nodeEventCallback.onUpdate(unitNodeStatus, nodes));
+                }
+                break;
+            case OBSOLETE:
+                if (nodeStatus == DEPLOYED || nodeStatus == OBSOLETE) {
+                    deploymentUnitStore.updateNodeStatus(nodeName, id, 
version, REMOVING);
+                }
+                break;
+            case REMOVING:
+                deploymentUnitStore.getAllNodeStatuses(id, version)
+                        .thenAccept(nodes -> {
+                            UnitNodeStatus status = new UnitNodeStatus(id, 
version, REMOVING, nodeName);
+                            nodeEventCallback.onUpdate(status, nodes);
+                        });
+                break;
+            default:
+                break;
+        }
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
index 60f18309bf..7e64f168b8 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
@@ -32,11 +32,32 @@ import 
org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
  */
 public interface DeploymentUnitStore {
     /**
-     * Register node statuses change events listener.
+     * Registers node statuses change events listener.
      *
      * @param listener Node statuses update listener.
      */
-    void registerListener(NodeStatusWatchListener listener);
+    void registerNodeStatusListener(NodeStatusWatchListener listener);
+
+    /**
+     * Unregisters node statuses change events listener.
+     *
+     * @param listener Node statuses update listener.
+     */
+    void unregisterNodeStatusListener(NodeStatusWatchListener listener);
+
+    /**
+     * Registers cluster statuses change events listener.
+     *
+     * @param listener Cluster statuses update listener.
+     */
+    void registerClusterStatusListener(ClusterStatusWatchListener listener);
+
+    /**
+     * Unregisters cluster statuses change events listener.
+     *
+     * @param listener Cluster statuses update listener.
+     */
+    void unregisterClusterStatusListener(ClusterStatusWatchListener listener);
 
     /**
      * Returns cluster statuses of all existed deployment units.
@@ -154,11 +175,29 @@ public interface DeploymentUnitStore {
     CompletableFuture<List<String>> getAllNodes(String id, Version version);
 
     /**
-     * Removes all data for deployment unit.
+     * Returns a list of node statuses where unit with provided identifier and 
version is deployed.
+     *
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @return A list of node statuses where unit with provided identifier and 
version is deployed or empty list.
+     */
+    CompletableFuture<List<UnitNodeStatus>> getAllNodeStatuses(String id, 
Version version);
+
+    /**
+     * Removes cluster status.
+     *
+     * @param id Deployment unit identifier.
+     * @param version Deployment version identifier.
+     * @return Future with {@code true} result if removed successfully.
+     */
+    CompletableFuture<Boolean> removeClusterStatus(String id, Version version);
+
+    /**
+     * Removes node status.
      *
      * @param id Deployment unit identifier.
      * @param version Deployment version identifier.
      * @return Future with {@code true} result if removed successfully.
      */
-    CompletableFuture<Boolean> remove(String id, Version version);
+    CompletableFuture<Boolean> removeNodeStatus(String nodeId, String id, 
Version version);
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
index 6c6cd51b60..b4a7f30016 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
@@ -26,9 +26,6 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -38,16 +35,12 @@ import java.util.stream.Collectors;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.deployunit.DeploymentStatus;
 import 
org.apache.ignite.internal.deployunit.metastore.accumulator.ClusterStatusAccumulator;
-import 
org.apache.ignite.internal.deployunit.metastore.accumulator.KeyAccumulator;
 import 
org.apache.ignite.internal.deployunit.metastore.accumulator.NodeStatusAccumulator;
 import org.apache.ignite.internal.deployunit.metastore.status.ClusterStatusKey;
 import org.apache.ignite.internal.deployunit.metastore.status.NodeStatusKey;
 import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
 import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.dsl.Condition;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
-import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.lang.ByteArray;
 
@@ -67,10 +60,25 @@ public class DeploymentUnitStoreImpl implements 
DeploymentUnitStore {
     }
 
     @Override
-    public void registerListener(NodeStatusWatchListener listener) {
+    public void registerNodeStatusListener(NodeStatusWatchListener listener) {
         
metaStorage.registerPrefixWatch(NodeStatusKey.builder().build().toByteArray(), 
listener);
     }
 
+    @Override
+    public void unregisterNodeStatusListener(NodeStatusWatchListener listener) 
{
+        metaStorage.unregisterWatch(listener);
+    }
+
+    @Override
+    public void registerClusterStatusListener(ClusterStatusWatchListener 
listener) {
+        
metaStorage.registerPrefixWatch(ClusterStatusKey.builder().build().toByteArray(),
 listener);
+    }
+
+    @Override
+    public void unregisterClusterStatusListener(ClusterStatusWatchListener 
listener) {
+        metaStorage.unregisterWatch(listener);
+    }
+
     @Override
     public CompletableFuture<List<UnitClusterStatus>> getClusterStatuses() {
         CompletableFuture<List<UnitClusterStatus>> result = new 
CompletableFuture<>();
@@ -182,31 +190,25 @@ public class DeploymentUnitStoreImpl implements 
DeploymentUnitStore {
     }
 
     @Override
-    public CompletableFuture<Boolean> remove(String id, Version version) {
-        ByteArray key = 
ClusterStatusKey.builder().id(id).version(version).build().toByteArray();
-        CompletableFuture<List<byte[]>> nodesFuture = new 
CompletableFuture<>();
-        
metaStorage.prefix(NodeStatusKey.builder().id(id).version(version).build().toByteArray())
-                .subscribe(new KeyAccumulator().toSubscriber(nodesFuture));
-
-        return nodesFuture.thenCompose(nodes ->
-                metaStorage.invoke(existsAll(key, nodes), removeAll(key, 
nodes), Collections.emptyList())
-        );
+    public CompletableFuture<List<UnitNodeStatus>> getAllNodeStatuses(String 
id, Version version) {
+        CompletableFuture<List<UnitNodeStatus>> result = new 
CompletableFuture<>();
+        ByteArray nodes = 
NodeStatusKey.builder().id(id).version(version).build().toByteArray();
+        metaStorage.prefix(nodes).subscribe(new 
NodeStatusAccumulator().toSubscriber(result));
+        return result;
     }
 
-    private static Condition existsAll(ByteArray key, List<byte[]> nodeKeys) {
-        Condition result = exists(key);
-        for (byte[] keyArr : nodeKeys) {
-            result = Conditions.and(result, exists(new ByteArray(keyArr)));
-        }
-        return result;
+    @Override
+    public CompletableFuture<Boolean> removeClusterStatus(String id, Version 
version) {
+        ByteArray key = 
ClusterStatusKey.builder().id(id).version(version).build().toByteArray();
+
+        return metaStorage.invoke(exists(key), Operations.remove(key), noop());
     }
 
-    private static Collection<Operation> removeAll(ByteArray key, List<byte[]> 
keys) {
-        List<Operation> operations = new ArrayList<>();
-        operations.add(Operations.remove(key));
+    @Override
+    public CompletableFuture<Boolean> removeNodeStatus(String nodeId, String 
id, Version version) {
+        ByteArray key = 
NodeStatusKey.builder().id(id).version(version).nodeId(nodeId).build().toByteArray();
 
-        
keys.stream().map(ByteArray::new).map(Operations::remove).collect(Collectors.toCollection(()
 -> operations));
-        return operations;
+        return metaStorage.invoke(exists(key), Operations.remove(key), noop());
     }
 
     /**
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
index a74c31be0c..6e10894de0 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
@@ -18,50 +18,51 @@
 package org.apache.ignite.internal.deployunit.metastore;
 
 import java.util.List;
+import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 
 /**
  * Listener of deployment unit node status changes.
  */
-public interface NodeEventCallback {
+public abstract class NodeEventCallback {
     /**
      * Change event.
      *
      * @param status Deployment unit status.
-     * @param holders Nodes consistent id.
+     * @param holders Node statuses.
      */
-    default void onUpdate(UnitNodeStatus status, List<String> holders) {
+    public void onUpdate(UnitNodeStatus status, List<UnitNodeStatus> holders) {
         switch (status.status()) {
             case UPLOADING:
-                onUploading(status, holders);
+                onUploading(status.id(), status.version(), holders);
                 break;
             case DEPLOYED:
-                onDeploy(status, holders);
+                onDeploy(status.id(), status.version(), holders);
                 break;
             case REMOVING:
-                onRemoving(status, holders);
+                onRemoving(status.id(), status.version(), holders);
                 break;
             case OBSOLETE:
-                onObsolete(status, holders);
+                onObsolete(status.id(), status.version(), holders);
                 break;
             default:
                 break;
         }
     }
 
-    default void onUploading(UnitNodeStatus status, List<String> holders) {
+    protected void onUploading(String id, Version version, 
List<UnitNodeStatus> holders) {
 
     }
 
-    default void onDeploy(UnitNodeStatus status, List<String> holders) {
+    protected void onDeploy(String id, Version version, List<UnitNodeStatus> 
holders) {
 
     }
 
-    default void onObsolete(UnitNodeStatus status, List<String> holders) {
+    protected void onObsolete(String id, Version version, List<UnitNodeStatus> 
holders) {
 
     }
 
-    default void onRemoving(UnitNodeStatus status, List<String> holders) {
+    protected void onRemoving(String id, Version version, List<UnitNodeStatus> 
holders) {
 
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
index 67a2d684f3..4b8ff48294 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
@@ -23,7 +23,6 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.Supplier;
 import org.apache.ignite.internal.deployunit.metastore.status.NodeStatusKey;
 import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -42,7 +41,7 @@ public class NodeStatusWatchListener implements WatchListener 
{
 
     private final DeploymentUnitStore deploymentUnitStore;
 
-    private final Supplier<String> localNodeProvider;
+    private final String nodeName;
 
     private final NodeEventCallback callback;
 
@@ -53,14 +52,12 @@ public class NodeStatusWatchListener implements 
WatchListener {
      * Constructor.
      *
      * @param deploymentUnitStore Unit statuses store.
-     * @param localNodeProvider Node consistent identifier provider.
+     * @param nodeName Node consistent ID.
      * @param callback Node event callback.
      */
-    public NodeStatusWatchListener(DeploymentUnitStore deploymentUnitStore,
-            Supplier<String> localNodeProvider,
-            NodeEventCallback callback) {
+    public NodeStatusWatchListener(DeploymentUnitStore deploymentUnitStore, 
String nodeName, NodeEventCallback callback) {
         this.deploymentUnitStore = deploymentUnitStore;
-        this.localNodeProvider = localNodeProvider;
+        this.nodeName = nodeName;
         this.callback = callback;
     }
 
@@ -74,15 +71,14 @@ public class NodeStatusWatchListener implements 
WatchListener {
 
             NodeStatusKey nodeStatusKey = NodeStatusKey.fromBytes(key);
 
-            if (!Objects.equals(localNodeProvider.get(), 
nodeStatusKey.nodeId())
-                    || value == null) {
+            if (!Objects.equals(nodeName, nodeStatusKey.nodeId()) || value == 
null) {
                 continue;
             }
 
             UnitNodeStatus nodeStatus = UnitNodeStatus.deserialize(value);
 
             CompletableFuture.supplyAsync(() -> nodeStatus, executor)
-                    .thenComposeAsync(status -> 
deploymentUnitStore.getAllNodes(status.id(), status.version()), executor)
+                    .thenComposeAsync(status -> 
deploymentUnitStore.getAllNodeStatuses(status.id(), status.version()), executor)
                     .thenAccept(nodes -> callback.onUpdate(nodeStatus, nodes));
         }
         return completedFuture(null);
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
index 234a12ac6c..7405e4ca5d 100644
--- 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -34,6 +35,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallback;
+import 
org.apache.ignite.internal.deployunit.metastore.ClusterStatusWatchListener;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl;
 import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
 import org.apache.ignite.internal.deployunit.metastore.NodeStatusWatchListener;
@@ -47,7 +50,6 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
-import org.awaitility.Awaitility;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -61,12 +63,21 @@ public class DeploymentUnitStoreImplTest {
 
     private final VaultManager vaultManager = new VaultManager(new 
InMemoryVaultService());
 
-    private final List<UnitNodeStatus> history = 
Collections.synchronizedList(new ArrayList<>());
+    private final List<UnitNodeStatus> nodeHistory = 
Collections.synchronizedList(new ArrayList<>());
 
-    private final NodeEventCallback listener = new NodeEventCallback() {
+    private final NodeEventCallback nodeEventCallback = new 
NodeEventCallback() {
         @Override
-        public void onUpdate(UnitNodeStatus status, List<String> holders) {
-            history.add(status);
+        public void onUpdate(UnitNodeStatus status, List<UnitNodeStatus> 
holders) {
+            nodeHistory.add(status);
+        }
+    };
+
+    private final List<UnitClusterStatus> clusterHistory = 
Collections.synchronizedList(new ArrayList<>());
+
+    private final ClusterEventCallback clusterEventCallback = new 
ClusterEventCallback() {
+        @Override
+        public void onUpdate(UnitClusterStatus status) {
+            clusterHistory.add(status);
         }
     };
 
@@ -77,12 +88,14 @@ public class DeploymentUnitStoreImplTest {
 
     @BeforeEach
     public void setup() {
-        history.clear();
+        nodeHistory.clear();
+        clusterHistory.clear();
         KeyValueStorage storage = new RocksDbKeyValueStorage("test", workDir);
 
         MetaStorageManager metaStorageManager = 
StandaloneMetaStorageManager.create(vaultManager, storage);
         metastore = new DeploymentUnitStoreImpl(metaStorageManager);
-        metastore.registerListener(new NodeStatusWatchListener(metastore, () 
-> LOCAL_NODE, listener));
+        metastore.registerNodeStatusListener(new 
NodeStatusWatchListener(metastore, LOCAL_NODE, nodeEventCallback));
+        metastore.registerClusterStatusListener(new 
ClusterStatusWatchListener(clusterEventCallback));
 
         vaultManager.start();
         metaStorageManager.start();
@@ -104,7 +117,7 @@ public class DeploymentUnitStoreImplTest {
         assertThat(metastore.getClusterStatus(id, version),
                 willBe(new UnitClusterStatus(id, version, DEPLOYED, 
Set.of())));
 
-        assertThat(metastore.remove(id, version), willBe(true));
+        assertThat(metastore.removeClusterStatus(id, version), willBe(true));
 
         assertThat(metastore.getClusterStatus(id, version), 
willBe(nullValue()));
     }
@@ -144,12 +157,12 @@ public class DeploymentUnitStoreImplTest {
                 willBe(contains((new UnitClusterStatus(id, version, DEPLOYED, 
Set.of(node1, node2, node3)))))
         );
 
-        assertThat(metastore.remove(id, version), willBe(true));
+        assertThat(metastore.removeNodeStatus(node1, id, version), 
willBe(true));
         assertThat(metastore.getNodeStatus(node1, id, version), 
willBe(nullValue()));
     }
 
     @Test
-    public void testNodeEventListener() throws InterruptedException {
+    public void testNodeEventListener() {
         String id = "id5";
         Version version = Version.parseVersion("1.1.1");
         String node1 = LOCAL_NODE;
@@ -159,12 +172,31 @@ public class DeploymentUnitStoreImplTest {
         assertThat(metastore.updateNodeStatus(node1, id, version, OBSOLETE), 
willBe(true));
         assertThat(metastore.updateNodeStatus(node1, id, version, REMOVING), 
willBe(true));
 
-        Awaitility.await().untilAsserted(() ->
-                assertThat(history, containsInAnyOrder(
+        await().untilAsserted(() ->
+                assertThat(nodeHistory, containsInAnyOrder(
                         new UnitNodeStatus(id, version, UPLOADING, node1),
                         new UnitNodeStatus(id, version, DEPLOYED, node1),
                         new UnitNodeStatus(id, version, OBSOLETE, node1),
                         new UnitNodeStatus(id, version, REMOVING, node1)
                 )));
     }
+
+    @Test
+    public void testClusterEventListener() {
+        String id = "id6";
+        Version version = Version.parseVersion("1.1.1");
+
+        assertThat(metastore.createClusterStatus(id, version, Set.of()), 
willBe(true));
+        assertThat(metastore.updateClusterStatus(id, version, DEPLOYED), 
willBe(true));
+        assertThat(metastore.updateClusterStatus(id, version, OBSOLETE), 
willBe(true));
+        assertThat(metastore.updateClusterStatus(id, version, REMOVING), 
willBe(true));
+
+        await().untilAsserted(() ->
+                assertThat(clusterHistory, containsInAnyOrder(
+                        new UnitClusterStatus(id, version, UPLOADING, 
Set.of()),
+                        new UnitClusterStatus(id, version, DEPLOYED, Set.of()),
+                        new UnitClusterStatus(id, version, OBSOLETE, Set.of()),
+                        new UnitClusterStatus(id, version, REMOVING, Set.of())
+                )));
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
index a5b68c381f..fbe3ff5c93 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
@@ -17,9 +17,13 @@
 
 package org.apache.ignite.internal.deployment;
 
-import java.util.Arrays;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+
 import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
@@ -38,42 +42,51 @@ public class ItDeploymentUnitFailoverTest extends 
ClusterPerTestIntegrationTest
         files = new DeployFiles(workDir);
     }
 
-    @Override
-    protected int initialNodes() {
-        return 9;
-    }
-
-    @Override
-    protected int[] cmgMetastoreNodes() {
-        return new int[] {6, 7, 8};
-    }
-
     @Test
     public void testDeployWithNodeStop() {
-        IgniteImpl cmgNode = cluster.node(8);
-
-        List<String> cmgNodes = Arrays.stream(cmgMetastoreNodes())
-                .mapToObj(i -> node(i).name())
-                .collect(Collectors.toList());
+        int nodeIndex = 1;
+        IgniteImpl node = node(nodeIndex);
+        IgniteImpl cmgNode = node(0);
 
-        // Deploy to all CMG nodes, not only to majority
+        // Deploy to majority and additional node
         Unit big = files.deployAndVerify(
                 "id1",
                 Version.parseVersion("1.0.0"),
                 false,
                 List.of(files.bigFile()),
                 null,
-                cmgNodes,
-                cluster.node(3)
+                List.of(node.name()),
+                cmgNode
         );
 
-        stopNode(8);
+        stopNode(nodeIndex);
 
-        big.waitUnitClean(cmgNode);
-        big.waitUnitReplica(cluster.node(6));
-        big.waitUnitReplica(cluster.node(7));
-
-        cmgNode = startNode(8);
+        big.waitUnitClean(node);
         big.waitUnitReplica(cmgNode);
+
+        node = startNode(nodeIndex);
+        big.waitUnitReplica(node);
+    }
+
+    @Test
+    public void testUndeployWithNodeStop() {
+        int nodeIndex = 1;
+        String id = "id1";
+        Version version = Version.parseVersion("1.0.0");
+        Unit unit = files.deployAndVerify(
+                id, version, false,
+                List.of(files.smallFile()),
+                null, List.of(node(nodeIndex).name()),
+                node(0)
+        );
+
+        await().until(() -> 
node(nodeIndex).deployment().clusterStatusAsync(id, version), willBe(DEPLOYED));
+
+        stopNode(nodeIndex);
+
+        assertThat(unit.undeployAsync(), willCompleteSuccessfully());
+
+        IgniteImpl cmgNode = startNode(nodeIndex);
+        unit.waitUnitClean(cmgNode);
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index 42e3561178..ceb026d673 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -26,9 +26,9 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.nullValue;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.version.Version;
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.deployunit.IgniteDeployment;
 import org.apache.ignite.internal.deployunit.InitialDeployMode;
 import org.apache.ignite.internal.deployunit.UnitStatuses;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -56,16 +57,16 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     @Test
     public void testDeploy() {
         String id = "test";
-        Unit unit = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.0"), cluster.node(1));
+        Unit unit = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.0"), node(1));
 
-        IgniteImpl cmg = cluster.node(0);
+        IgniteImpl cmg = node(0);
         unit.waitUnitReplica(cmg);
 
         UnitStatuses status = buildStatus(id, unit);
 
         await().timeout(2, SECONDS)
                 .pollDelay(500, MILLISECONDS)
-                .until(() -> node(2).deployment().clusterStatusesAsync(), 
willBe(Collections.singletonList(status)));
+                .until(() -> node(2).deployment().clusterStatusesAsync(), 
willBe(List.of(status)));
     }
 
     @Test
@@ -75,40 +76,40 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
                 Version.parseVersion("1.1.0"),
                 false,
                 List.of(files.smallFile(), files.mediumFile()),
-                cluster.node(1)
+                node(1)
         );
 
-        IgniteImpl cmg = cluster.node(0);
+        IgniteImpl cmg = node(0);
         unit.waitUnitReplica(cmg);
 
         UnitStatuses status = buildStatus(id, unit);
 
         await().timeout(2, SECONDS)
                 .pollDelay(500, MILLISECONDS)
-                .until(() -> node(2).deployment().clusterStatusesAsync(), 
willBe(Collections.singletonList(status)));
+                .until(() -> node(2).deployment().clusterStatusesAsync(), 
willBe(List.of(status)));
     }
 
     @Test
     public void testDeployUndeploy() {
-        Unit unit = files.deployAndVerifySmall("test", 
Version.parseVersion("1.1.0"), cluster.node(1));
+        Unit unit = files.deployAndVerifySmall("test", 
Version.parseVersion("1.1.0"), node(1));
 
-        IgniteImpl cmg = cluster.node(0);
+        IgniteImpl cmg = node(0);
         unit.waitUnitReplica(cmg);
 
         unit.undeploy();
         unit.waitUnitClean(cmg);
 
         CompletableFuture<List<UnitStatuses>> list = 
node(2).deployment().clusterStatusesAsync();
-        assertThat(list, willBe(Collections.emptyList()));
+        assertThat(list, willBe(empty()));
     }
 
     @Test
     public void testDeployTwoUnits() {
         String id = "test";
-        Unit unit1 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.0"), cluster.node(1));
-        Unit unit2 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.1"), cluster.node(2));
+        Unit unit1 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.0"), node(1));
+        Unit unit2 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.1"), node(2));
 
-        IgniteImpl cmg = cluster.node(0);
+        IgniteImpl cmg = node(0);
         unit1.waitUnitReplica(cmg);
         unit2.waitUnitReplica(cmg);
 
@@ -125,10 +126,10 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     @Test
     public void testDeployTwoUnitsAndUndeployOne() {
         String id = "test";
-        Unit unit1 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.0"), cluster.node(1));
-        Unit unit2 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.1"), cluster.node(2));
+        Unit unit1 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.0"), node(1));
+        Unit unit2 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.1"), node(2));
 
-        IgniteImpl cmg = cluster.node(0);
+        IgniteImpl cmg = node(0);
         unit1.waitUnitReplica(cmg);
         unit2.waitUnitReplica(cmg);
 
@@ -147,12 +148,12 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     public void testDeploymentStatus() {
         String id = "test";
         Version version = Version.parseVersion("1.1.0");
-        Unit unit = files.deployAndVerifyMedium(id, version, cluster.node(1));
+        Unit unit = files.deployAndVerifyMedium(id, version, node(1));
 
         CompletableFuture<DeploymentStatus> status = 
node(2).deployment().clusterStatusAsync(id, version);
         assertThat(status, willBe(UPLOADING));
 
-        IgniteImpl cmg = cluster.node(0);
+        IgniteImpl cmg = node(0);
         unit.waitUnitReplica(cmg);
 
         await().timeout(2, SECONDS)
@@ -167,16 +168,17 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
         assertThat(node(2).deployment().clusterStatusAsync(id, version), 
willBe(nullValue()));
     }
 
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19757";)
     @Test
     public void testRedeploy() {
         String id = "test";
         String version = "1.1.0";
-        Unit smallUnit = files.deployAndVerifySmall(id, 
Version.parseVersion(version), cluster.node(1));
+        Unit smallUnit = files.deployAndVerifySmall(id, 
Version.parseVersion(version), node(1));
 
-        IgniteImpl cmg = cluster.node(0);
+        IgniteImpl cmg = node(0);
         smallUnit.waitUnitReplica(cmg);
 
-        Unit mediumUnit = files.deployAndVerify(id, 
Version.parseVersion(version), true, List.of(files.mediumFile()), 
cluster.node(1));
+        Unit mediumUnit = files.deployAndVerify(id, 
Version.parseVersion(version), true, List.of(files.mediumFile()), node(1));
         mediumUnit.waitUnitReplica(cmg);
 
         smallUnit.waitUnitClean(smallUnit.deployedNode());
@@ -187,12 +189,12 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     public void testOnDemandDeploy() {
         String id = "test";
         Version version = Version.parseVersion("1.1.0");
-        Unit smallUnit = files.deployAndVerifySmall(id, version, 
cluster.node(1));
+        Unit smallUnit = files.deployAndVerifySmall(id, version, node(1));
 
-        IgniteImpl cmg = cluster.node(0);
+        IgniteImpl cmg = node(0);
         smallUnit.waitUnitReplica(cmg);
 
-        IgniteImpl onDemandDeployNode = cluster.node(2);
+        IgniteImpl onDemandDeployNode = node(2);
         CompletableFuture<Boolean> onDemandDeploy = 
onDemandDeployNode.deployment().onDemandDeploy(id, version);
 
         assertThat(onDemandDeploy, willBe(true));
@@ -203,12 +205,12 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     public void testOnDemandDeployToDeployedNode() {
         String id = "test";
         Version version = Version.parseVersion("1.1.0");
-        Unit smallUnit = files.deployAndVerifySmall(id, version, 
cluster.node(1));
+        Unit smallUnit = files.deployAndVerifySmall(id, version, node(1));
 
-        IgniteImpl cmg = cluster.node(0);
+        IgniteImpl cmg = node(0);
         smallUnit.waitUnitReplica(cmg);
 
-        IgniteImpl onDemandDeployNode = cluster.node(1);
+        IgniteImpl onDemandDeployNode = node(1);
         CompletableFuture<Boolean> onDemandDeploy = 
onDemandDeployNode.deployment().onDemandDeploy(id, version);
 
         assertThat(onDemandDeploy, willBe(true));
@@ -218,39 +220,30 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     @Test
     public void testDeployToCmg() {
         String id = "test";
-        Version version = Version.parseVersion("1.1.0");
-        Unit smallUnit = files.deployAndVerifySmall(id, version, 
cluster.node(0));
+        Unit smallUnit = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.0"), node(0));
 
-        await().untilAsserted(() -> {
-            CompletableFuture<List<UnitStatuses>> list = 
node(0).deployment().clusterStatusesAsync();
-            assertThat(list, 
willBe(List.of(UnitStatuses.builder(id).append(version, DEPLOYED).build())));
-        });
+        await().until(() -> node(0).deployment().clusterStatusesAsync(id), 
willBe(buildStatus(id, smallUnit)));
     }
 
     @Test
     public void testDeployToSpecificNode() {
         String id = "test";
-        Version version = Version.parseVersion("1.1.0");
         Unit smallUnit = files.deployAndVerify(
-                id, version, false, List.of(files.smallFile()),
+                id, Version.parseVersion("1.1.0"), false, 
List.of(files.smallFile()),
                 null, List.of(node(1).name()),
                 node(0)
         );
 
         smallUnit.waitUnitReplica(node(1));
 
-        await().untilAsserted(() -> {
-            CompletableFuture<List<UnitStatuses>> list = 
node(0).deployment().clusterStatusesAsync();
-            assertThat(list, 
willBe(List.of(UnitStatuses.builder(id).append(version, DEPLOYED).build())));
-        });
+        await().until(() -> node(0).deployment().clusterStatusesAsync(id), 
willBe(buildStatus(id, smallUnit)));
     }
 
     @Test
     public void testDeployToAll() {
         String id = "test";
-        Version version = Version.parseVersion("1.1.0");
         Unit smallUnit = files.deployAndVerify(
-                id, version, false, List.of(files.smallFile()),
+                id, Version.parseVersion("1.1.0"), false, 
List.of(files.smallFile()),
                 InitialDeployMode.ALL, List.of(),
                 node(0)
         );
@@ -258,9 +251,6 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
         smallUnit.waitUnitReplica(node(1));
         smallUnit.waitUnitReplica(node(2));
 
-        await().untilAsserted(() -> {
-            CompletableFuture<List<UnitStatuses>> list = 
node(0).deployment().clusterStatusesAsync();
-            assertThat(list, 
willBe(List.of(UnitStatuses.builder(id).append(version, DEPLOYED).build())));
-        });
+        await().until(() -> node(0).deployment().clusterStatusesAsync(id), 
willBe(buildStatus(id, smallUnit)));
     }
 }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 322252bfd5..c8d20a5375 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -539,7 +539,8 @@ public class IgniteImpl implements Ignite {
                 logicalTopologyService,
                 workDir,
                 
nodeConfigRegistry.getConfiguration(DeploymentConfiguration.KEY),
-                cmgMgr
+                cmgMgr,
+                name
         );
         deploymentManager = deploymentManagerImpl;
 

Reply via email to