This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 995eba14b3 IGNITE-19518 Validate unit after node restart (#2151)
995eba14b3 is described below

commit 995eba14b3cbe23431d75e615bd33eaae145c4a1
Author: Mikhail <[email protected]>
AuthorDate: Mon Jun 12 12:30:34 2023 +0300

    IGNITE-19518 Validate unit after node restart (#2151)
---
 .../internal/deployunit/DefaultNodeCallback.java   |  98 +++++++
 .../deployunit/DeployMessagingService.java         |   6 +-
 .../internal/deployunit/DeploymentManagerImpl.java |  59 ++---
 .../{DeployTracker.java => DownloadTracker.java}   |  24 +-
 .../metastore/DeploymentUnitFailover.java          |  88 +++++++
 .../deployunit/metastore/DeploymentUnitStore.java  |   2 -
 .../deployunit/metastore/NodeEventCallback.java    |  38 ++-
 .../metastore/NodeStatusWatchListener.java         |   7 +-
 .../metastore/DeploymentUnitStoreImplTest.java     |   7 +-
 .../java/org/apache/ignite/internal/Cluster.java   |  39 ++-
 .../internal/ClusterPerTestIntegrationTest.java    |   6 +-
 .../ignite/internal/deployment/DeployFile.java     |  56 ++++
 .../ignite/internal/deployment/DeployFiles.java    | 155 +++++++++++
 .../deployment/ItDeploymentUnitFailoverTest.java   |  62 +++++
 .../internal/deployment/ItDeploymentUnitTest.java  | 284 ++++-----------------
 .../apache/ignite/internal/deployment/Unit.java    | 129 ++++++++++
 .../apache/ignite/internal/jdbc/ItJdbcTest.java    |   2 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   1 +
 18 files changed, 751 insertions(+), 312 deletions(-)

diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
new file mode 100644
index 0000000000..3d26681e33
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static 
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
+
+import java.util.List;
+import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
+import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
+import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Default implementation of {@link NodeEventCallback}.
+ */
+public class DefaultNodeCallback implements NodeEventCallback {
+    private final DeploymentUnitStore deploymentUnitStore;
+
+    private final DeployMessagingService messaging;
+
+    private final FileDeployerService deployer;
+
+    private final ClusterService clusterService;
+
+    private final DownloadTracker tracker;
+
+    /**
+     * Constructor.
+     *
+     * @param deploymentUnitStore Deployment units store.
+     * @param messaging Deployment messaging service.
+     * @param deployer Deployment unit file system service.
+     * @param clusterService Cluster service.
+     */
+    public DefaultNodeCallback(
+            DeploymentUnitStore deploymentUnitStore,
+            DeployMessagingService messaging,
+            FileDeployerService deployer,
+            ClusterService clusterService,
+            DownloadTracker tracker
+    ) {
+        this.deploymentUnitStore = deploymentUnitStore;
+        this.messaging = messaging;
+        this.deployer = deployer;
+        this.clusterService = clusterService;
+        this.tracker = tracker;
+    }
+
+    @Override
+    public void onUploading(UnitNodeStatus status, List<String> holders) {
+        tracker.track(status.id(), status.version(),
+                () -> messaging.downloadUnitContent(status.id(), 
status.version(), holders)
+                        .thenCompose(content -> deployer.deploy(status.id(), 
status.version(), content))
+                        .thenApply(deployed -> {
+                            if (deployed) {
+                                return deploymentUnitStore.updateNodeStatus(
+                                        getLocalNodeId(),
+                                        status.id(),
+                                        status.version(),
+                                        DEPLOYED);
+                            }
+                            return deployed;
+                        })
+        );
+    }
+
+    @Override
+    public void onDeploy(UnitNodeStatus status, List<String> holders) {
+        deploymentUnitStore.getClusterStatus(status.id(), status.version())
+                .thenApply(UnitClusterStatus::initialNodesToDeploy)
+                .thenApply(holders::containsAll)
+                .thenAccept(allRequiredDeployed -> {
+                    if (allRequiredDeployed) {
+                        deploymentUnitStore.updateClusterStatus(status.id(), 
status.version(), DEPLOYED);
+                    }
+                });
+    }
+
+    private String getLocalNodeId() {
+        return clusterService.topologyService().localMember().name();
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
index 31d652446a..1834907736 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
@@ -65,7 +65,7 @@ public class DeployMessagingService {
     /**
      * Tracker of deploy actions.
      */
-    private final DeployTracker tracker;
+    private final DownloadTracker tracker;
 
     /**
      * Constructor.
@@ -79,7 +79,7 @@ public class DeployMessagingService {
             ClusterService clusterService,
             ClusterManagementGroupManager cmgManager,
             FileDeployerService deployerService,
-            DeployTracker tracker
+            DownloadTracker tracker
     ) {
         this.clusterService = clusterService;
         this.cmgManager = cmgManager;
@@ -168,7 +168,7 @@ public class DeployMessagingService {
     }
 
     private void processStopDeployRequest(StopDeployRequest request, String 
senderConsistentId, long correlationId) {
-        tracker.cancelIfDeploy(request.id(), 
Version.parseVersion(request.version()));
+        tracker.cancelIfDownloading(request.id(), 
Version.parseVersion(request.version()));
         clusterService.messagingService()
                 .respond(senderConsistentId, 
StopDeployResponseImpl.builder().build(), correlationId);
 
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 8f0d462066..2be1863a52 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -23,30 +23,28 @@ import static 
java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
 import static 
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.OBSOLETE;
 import static 
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.REMOVING;
-import static 
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.UPLOADING;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.compute.version.Version;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder;
 import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitAlreadyExistsException;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
+import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitFailover;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
 import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl;
 import org.apache.ignite.internal.deployunit.metastore.NodeStatusWatchListener;
 import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
-import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -58,7 +56,6 @@ import org.jetbrains.annotations.Nullable;
  * Deployment manager implementation.
  */
 public class DeploymentManagerImpl implements IgniteDeployment {
-
     private static final IgniteLogger LOG = 
Loggers.forClass(DeploymentManagerImpl.class);
 
     /**
@@ -99,7 +96,12 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
     /**
      * Deploy tracker.
      */
-    private final DeployTracker tracker;
+    private final DownloadTracker tracker;
+
+    /**
+     * Failover.
+     */
+    private final DeploymentUnitFailover failover;
 
     /**
      * Constructor.
@@ -110,45 +112,23 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
      * @param configuration Deployment configuration.
      * @param cmgManager Cluster management group manager.
      */
-    public DeploymentManagerImpl(ClusterService clusterService,
+    public DeploymentManagerImpl(
+            ClusterService clusterService,
             MetaStorageManager metaStorage,
+            LogicalTopologyService logicalTopology,
             Path workDir,
             DeploymentConfiguration configuration,
-            ClusterManagementGroupManager cmgManager) {
+            ClusterManagementGroupManager cmgManager
+    ) {
         this.clusterService = clusterService;
         this.configuration = configuration;
         this.cmgManager = cmgManager;
         this.workDir = workDir;
-        tracker = new DeployTracker();
+        tracker = new DownloadTracker();
         deployer = new FileDeployerService();
         messaging = new DeployMessagingService(clusterService, cmgManager, 
deployer, tracker);
         deploymentUnitStore = new DeploymentUnitStoreImpl(metaStorage);
-    }
-
-    private void onUnitRegister(UnitNodeStatus status, Set<String> 
deployedNodes) {
-        if (status.status() == UPLOADING) {
-            messaging.downloadUnitContent(status.id(), status.version(), new 
ArrayList<>(deployedNodes))
-                    .thenCompose(content -> deployer.deploy(status.id(), 
status.version(), content))
-                    .thenApply(deployed -> {
-                        if (deployed) {
-                            return deploymentUnitStore.updateNodeStatus(
-                                    getLocalNodeId(),
-                                    status.id(),
-                                    status.version(),
-                                    DEPLOYED);
-                        }
-                        return deployed;
-                    });
-        } else if (status.status() == DEPLOYED) {
-            deploymentUnitStore.getClusterStatus(status.id(), status.version())
-                    .thenApply(UnitClusterStatus::initialNodesToDeploy)
-                    .thenApply(deployedNodes::containsAll)
-                    .thenAccept(allRequiredDeployed -> {
-                        if (allRequiredDeployed) {
-                            
deploymentUnitStore.updateClusterStatus(status.id(), status.version(), 
DEPLOYED);
-                        }
-                    });
-        }
+        failover = new DeploymentUnitFailover(logicalTopology, 
deploymentUnitStore, clusterService);
     }
 
     @Override
@@ -185,7 +165,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
             LOG.error("Error reading deployment unit content", e);
             return failedFuture(e);
         }
-        return tracker.track(id, version, deployToLocalNode(id, version, 
unitContent)
+        return deployToLocalNode(id, version, unitContent)
                 .thenApply(completed -> {
                     if (completed) {
                         String localNodeId = getLocalNodeId();
@@ -196,8 +176,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
                         }));
                     }
                     return completed;
-                })
-        );
+                });
     }
 
     private CompletableFuture<Boolean> deployToLocalNode(String id, Version 
version, UnitContent unitContent) {
@@ -322,12 +301,14 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
 
     @Override
     public void start() {
+        DefaultNodeCallback callback = new 
DefaultNodeCallback(deploymentUnitStore, messaging, deployer, clusterService, 
tracker);
         
deployer.initUnitsFolder(workDir.resolve(configuration.deploymentLocation().value()));
         deploymentUnitStore.registerListener(new NodeStatusWatchListener(
                 deploymentUnitStore,
                 this::getLocalNodeId,
-                this::onUnitRegister));
+                callback));
         messaging.subscribe();
+        failover.registerTopologyChangeCallback(callback);
     }
 
     @Override
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
similarity index 68%
rename from 
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
rename to 
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
index f3b57ec44e..0bdc2f202c 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
@@ -20,19 +20,18 @@ package org.apache.ignite.internal.deployunit;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.deployunit.metastore.status.ClusterStatusKey;
-import org.apache.ignite.internal.future.InFlightFutures;
-import org.apache.ignite.lang.ByteArray;
 
 /**
  * Deploy actions tracker.
  */
-public class DeployTracker {
+public class DownloadTracker {
     /**
      * In flight futures tracker.
      */
-    private final Map<ByteArray, InFlightFutures> inFlightFutures = new 
ConcurrentHashMap<>();
+    private final Map<ClusterStatusKey, CompletableFuture<?>> inFlightFutures 
= new ConcurrentHashMap<>();
 
     /**
      * Track deploy action.
@@ -43,9 +42,9 @@ public class DeployTracker {
      * @param trackableAction Deploy action.
      * @return {@param trackableAction}.
      */
-    public <T> CompletableFuture<T> track(String id, Version version, 
CompletableFuture<T> trackableAction) {
-        ByteArray key = 
ClusterStatusKey.builder().id(id).version(version).build().toByteArray();
-        return inFlightFutures.computeIfAbsent(key, k -> new 
InFlightFutures()).registerFuture(trackableAction);
+    public <T> CompletableFuture<T> track(String id, Version version, 
Supplier<CompletableFuture<T>> trackableAction) {
+        ClusterStatusKey key = 
ClusterStatusKey.builder().id(id).version(version).build();
+        return (CompletableFuture<T>) inFlightFutures.computeIfAbsent(key, k 
-> trackableAction.get());
     }
 
     /**
@@ -54,10 +53,11 @@ public class DeployTracker {
      * @param id Deployment unit identifier.
      * @param version Deployment version identifier.
      */
-    public void cancelIfDeploy(String id, Version version) {
-        InFlightFutures futureTracker = 
inFlightFutures.get(ClusterStatusKey.builder().id(id).version(version).build().toByteArray());
-        if (futureTracker != null) {
-            futureTracker.cancelInFlightFutures();
+    public void cancelIfDownloading(String id, Version version) {
+        ClusterStatusKey key = 
ClusterStatusKey.builder().id(id).version(version).build();
+        CompletableFuture<?> future = inFlightFutures.remove(key);
+        if (future != null) {
+            future.cancel(true);
         }
     }
 
@@ -65,6 +65,6 @@ public class DeployTracker {
      * Cancel all deploy actions.
      */
     public void cancelAll() {
-        
inFlightFutures.values().forEach(InFlightFutures::cancelInFlightFutures);
+        inFlightFutures.values().forEach(future -> future.cancel(true));
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
new file mode 100644
index 0000000000..6ed476baa5
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import static 
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
+import static 
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.UPLOADING;
+
+import java.util.Objects;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Deployment unit failover.
+ */
+public class DeploymentUnitFailover {
+    private final LogicalTopologyService logicalTopology;
+
+    private final DeploymentUnitStore deploymentUnitStore;
+
+    private final ClusterService clusterService;
+
+    /**
+     * Constructor.
+     *
+     * @param logicalTopology Logical topology service.
+     * @param deploymentUnitStore Deployment units store.
+     * @param clusterService Cluster service.
+     */
+    public DeploymentUnitFailover(
+            LogicalTopologyService logicalTopology,
+            DeploymentUnitStore deploymentUnitStore,
+            ClusterService clusterService
+    ) {
+        this.logicalTopology = logicalTopology;
+        this.deploymentUnitStore = deploymentUnitStore;
+        this.clusterService = clusterService;
+    }
+
+    /**
+     * Register {@link NodeEventCallback} as topology change callback.
+     *
+     * @param callback Node status callback.
+     */
+    public void registerTopologyChangeCallback(NodeEventCallback callback) {
+        logicalTopology.addEventListener(new LogicalTopologyEventListener() {
+            @Override
+            public void onNodeJoined(LogicalNode joinedNode, 
LogicalTopologySnapshot newTopology) {
+                String consistentId = joinedNode.name();
+                if (Objects.equals(consistentId, getLocalNodeId())) {
+                    deploymentUnitStore.getNodeStatuses(consistentId)
+                            .thenAccept(nodeStatuses -> 
nodeStatuses.forEach(nodeStatus -> {
+                                if (nodeStatus.status() == UPLOADING) {
+                                    
deploymentUnitStore.getClusterStatus(nodeStatus.id(), nodeStatus.version())
+                                            .thenAccept(clusterStatus -> {
+                                                if (clusterStatus.status() == 
UPLOADING || clusterStatus.status() == DEPLOYED) {
+                                                    
deploymentUnitStore.getAllNodes(nodeStatus.id(), nodeStatus.version())
+                                                            .thenAccept(nodes 
-> callback.onUploading(nodeStatus, nodes));
+                                                }
+                                            });
+                                }
+                            }));
+                }
+            }
+        });
+    }
+
+    private String getLocalNodeId() {
+        return clusterService.topologyService().localMember().name();
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
index 34cb08b8a5..e44731b88f 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
@@ -31,8 +31,6 @@ import 
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus;
  * Metastore for deployment units.
  */
 public interface DeploymentUnitStore {
-
-
     /**
      * Register node statuses change events listener.
      *
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
index e023af0c84..a74c31be0c 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
@@ -17,13 +17,12 @@
 
 package org.apache.ignite.internal.deployunit.metastore;
 
-import java.util.Set;
+import java.util.List;
 import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 
 /**
  * Listener of deployment unit node status changes.
  */
-@FunctionalInterface
 public interface NodeEventCallback {
     /**
      * Change event.
@@ -31,5 +30,38 @@ public interface NodeEventCallback {
      * @param status Deployment unit status.
      * @param holders Nodes consistent id.
      */
-    void onUpdate(UnitNodeStatus status, Set<String> holders);
+    default void onUpdate(UnitNodeStatus status, List<String> holders) {
+        switch (status.status()) {
+            case UPLOADING:
+                onUploading(status, holders);
+                break;
+            case DEPLOYED:
+                onDeploy(status, holders);
+                break;
+            case REMOVING:
+                onRemoving(status, holders);
+                break;
+            case OBSOLETE:
+                onObsolete(status, holders);
+                break;
+            default:
+                break;
+        }
+    }
+
+    default void onUploading(UnitNodeStatus status, List<String> holders) {
+
+    }
+
+    default void onDeploy(UnitNodeStatus status, List<String> holders) {
+
+    }
+
+    default void onObsolete(UnitNodeStatus status, List<String> holders) {
+
+    }
+
+    default void onRemoving(UnitNodeStatus status, List<String> holders) {
+
+    }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
index 1390467242..67a2d684f3 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.deployunit.metastore;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 
-import java.util.HashSet;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -45,7 +44,7 @@ public class NodeStatusWatchListener implements WatchListener 
{
 
     private final Supplier<String> localNodeProvider;
 
-    private final NodeEventCallback listener;
+    private final NodeEventCallback callback;
 
     private final ExecutorService executor = Executors.newFixedThreadPool(
             4, new NamedThreadFactory("NodeStatusWatchListener-pool", LOG));
@@ -62,7 +61,7 @@ public class NodeStatusWatchListener implements WatchListener 
{
             NodeEventCallback callback) {
         this.deploymentUnitStore = deploymentUnitStore;
         this.localNodeProvider = localNodeProvider;
-        this.listener = callback;
+        this.callback = callback;
     }
 
     @Override
@@ -84,7 +83,7 @@ public class NodeStatusWatchListener implements WatchListener 
{
 
             CompletableFuture.supplyAsync(() -> nodeStatus, executor)
                     .thenComposeAsync(status -> 
deploymentUnitStore.getAllNodes(status.id(), status.version()), executor)
-                    .thenAccept(nodes -> listener.onUpdate(nodeStatus, new 
HashSet<>(nodes)));
+                    .thenAccept(nodes -> callback.onUpdate(nodeStatus, nodes));
         }
         return completedFuture(null);
     }
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
index 51ed8200f2..5474c10613 100644
--- 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
@@ -63,7 +63,12 @@ public class DeploymentUnitStoreImplTest {
 
     private final List<UnitNodeStatus> history = 
Collections.synchronizedList(new ArrayList<>());
 
-    private final NodeEventCallback listener = (status, holders) -> 
history.add(status);
+    private final NodeEventCallback listener = new NodeEventCallback() {
+        @Override
+        public void onUpdate(UnitNodeStatus status, List<String> holders) {
+            history.add(status);
+        }
+    };
 
     private DeploymentUnitStoreImpl metastore;
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
index 7ae97e511a..d7204c3c3f 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -119,35 +120,59 @@ public class Cluster {
         this.defaultNodeBootstrapConfigTemplate = 
defaultNodeBootstrapConfigTemplate;
     }
 
+    public void startAndInit(int nodeCount) {
+        startAndInit(nodeCount, new int[] { 0 });
+    }
+
     /**
      * Starts the cluster with the given number of nodes and initializes it.
      *
      * @param nodeCount Number of nodes in the cluster.
+     * @param cmgNodes Indices of CMG nodes.
      */
-    public void startAndInit(int nodeCount) {
-        startAndInit(nodeCount, builder -> {});
+    public void startAndInit(int nodeCount, int[] cmgNodes) {
+        startAndInit(nodeCount, cmgNodes, builder -> {});
     }
 
     /**
      * Starts the cluster with the given number of nodes and initializes it.
      *
      * @param nodeCount Number of nodes in the cluster.
+     * @param cmgNodes Indices of CMG nodes.
+     * @param initParametersConfigurator Configure {@link InitParameters} 
before initializing the cluster.
+     */
+    public void startAndInit(int nodeCount, int[] cmgNodes, 
Consumer<InitParametersBuilder> initParametersConfigurator) {
+        startAndInit(nodeCount, cmgNodes, defaultNodeBootstrapConfigTemplate, 
initParametersConfigurator);
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it 
with CMG on first node.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     * @param nodeBootstrapConfigTemplate Node bootstrap config template to be 
used for each node started
+     *     with this call.
      * @param initParametersConfigurator Configure {@link InitParameters} 
before initializing the cluster.
      */
-    public void startAndInit(int nodeCount, Consumer<InitParametersBuilder> 
initParametersConfigurator) {
-        startAndInit(nodeCount, defaultNodeBootstrapConfigTemplate, 
initParametersConfigurator);
+    public void startAndInit(
+            int nodeCount,
+            String nodeBootstrapConfigTemplate,
+            Consumer<InitParametersBuilder> initParametersConfigurator
+    ) {
+        startAndInit(nodeCount, new int[] { 0 }, nodeBootstrapConfigTemplate, 
initParametersConfigurator);
     }
 
     /**
      * Starts the cluster with the given number of nodes and initializes it.
      *
      * @param nodeCount Number of nodes in the cluster.
+     * @param cmgNodes Indices of CMG nodes.
      * @param nodeBootstrapConfigTemplate Node bootstrap config template to be 
used for each node started
      *     with this call.
      * @param initParametersConfigurator Configure {@link InitParameters} 
before initializing the cluster.
      */
     public void startAndInit(
             int nodeCount,
+            int[] cmgNodes,
             String nodeBootstrapConfigTemplate,
             Consumer<InitParametersBuilder> initParametersConfigurator
     ) {
@@ -159,11 +184,11 @@ public class Cluster {
                 .mapToObj(nodeIndex -> startNodeAsync(nodeIndex, 
nodeBootstrapConfigTemplate))
                 .collect(toList());
 
-        String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+        List<String> metaStorageAndCmgNodeNames = 
Arrays.stream(cmgNodes).mapToObj(i -> testNodeName(testInfo, 
i)).collect(toList());
 
         InitParametersBuilder builder = InitParameters.builder()
-                .destinationNodeName(metaStorageAndCmgNodeName)
-                .metaStorageNodeNames(List.of(metaStorageAndCmgNodeName))
+                .destinationNodeName(metaStorageAndCmgNodeNames.get(0))
+                .metaStorageNodeNames(metaStorageAndCmgNodeNames)
                 .clusterName("cluster");
 
         initParametersConfigurator.accept(builder);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 1f0132e21c..82cd58351f 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -108,7 +108,7 @@ public abstract class ClusterPerTestIntegrationTest extends 
IgniteIntegrationTes
         cluster = new Cluster(testInfo, workDir, 
getNodeBootstrapConfigTemplate());
 
         if (initialNodes() > 0) {
-            cluster.startAndInit(initialNodes());
+            cluster.startAndInit(initialNodes(), cmgMetastoreNodes());
         }
     }
 
@@ -135,6 +135,10 @@ public abstract class ClusterPerTestIntegrationTest 
extends IgniteIntegrationTes
         return 3;
     }
 
+    protected int[] cmgMetastoreNodes() {
+        return new int[] { 0 };
+    }
+
     /**
      * Returns node bootstrap config template.
      *
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
new file mode 100644
index 0000000000..5ba85fb712
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployment;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+class DeployFile {
+    private final Path file;
+
+    private final long expectedSize;
+
+    private final int replicaTimeout;
+
+    DeployFile(Path file, long expectedSize, int replicaTimeout) throws 
IOException {
+        this.file = file;
+        this.expectedSize = expectedSize;
+        this.replicaTimeout = replicaTimeout;
+        ensureExists();
+    }
+
+    private void ensureExists() throws IOException {
+        if (!Files.exists(file)) {
+            IgniteUtils.fillDummyFile(file, expectedSize);
+        }
+    }
+
+    Path file() {
+        return file;
+    }
+
+    long expectedSize() {
+        return expectedSize;
+    }
+
+    int replicaTimeout() {
+        return replicaTimeout;
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
new file mode 100644
index 0000000000..6abeb4bb6d
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployment;
+
+import static 
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.deployunit.DeploymentUnit;
+import org.apache.ignite.internal.deployunit.UnitStatuses;
+import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder;
+
+class DeployFiles {
+    private static final int BASE_REPLICA_TIMEOUT = 30;
+
+    private static final long SMALL_IN_BYTES = 1024L;
+
+    private static final long MEDIUM_IN_BYTES = 1024L * 1024L;
+
+    private static final long BIG_IN_BYTES = 100 * 1024L * 1024L;
+
+    private final Path workDir;
+
+    private DeployFile smallFile;
+
+    private DeployFile mediumFile;
+
+    private DeployFile bigFile;
+
+    // TODO https://issues.apache.org/jira/browse/IGNITE-19009
+    DeployFiles(Path workDir) {
+        this.workDir = workDir;
+    }
+
+    private static DeployFile create(Path path, long size, int replicaTimeout) 
{
+        try {
+            return new DeployFile(path, size, replicaTimeout);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    DeployFile smallFile() {
+        if (smallFile == null) {
+            smallFile = create(workDir.resolve("small.txt"), SMALL_IN_BYTES, 
BASE_REPLICA_TIMEOUT);
+        }
+        return smallFile;
+    }
+
+    DeployFile mediumFile() {
+        if (mediumFile == null) {
+            mediumFile = create(workDir.resolve("medium.txt"), 
MEDIUM_IN_BYTES, BASE_REPLICA_TIMEOUT * 2);
+        }
+        return mediumFile;
+    }
+
+    DeployFile bigFile() {
+        if (bigFile == null) {
+            bigFile = create(workDir.resolve("big.txt"), BIG_IN_BYTES, 
BASE_REPLICA_TIMEOUT * 3);
+        }
+        return bigFile;
+    }
+
+    private Unit deployAndVerify(String id, Version version, DeployFile file, 
IgniteImpl entryNode) {
+        return deployAndVerify(id, version, false, file, entryNode);
+    }
+
+    private Unit deployAndVerify(String id, Version version, boolean force, 
DeployFile file, IgniteImpl entryNode) {
+        return deployAndVerify(id, version, force, List.of(file), entryNode);
+    }
+
+    public Unit deployAndVerify(String id, Version version, boolean force, 
List<DeployFile> files, IgniteImpl entryNode) {
+        List<Path> paths = files.stream()
+                .map(DeployFile::file)
+                .collect(Collectors.toList());
+
+        CompletableFuture<Boolean> deploy = entryNode.deployment()
+                .deployAsync(id, version, force, fromPaths(paths));
+
+        assertThat(deploy, willBe(true));
+
+        Unit unit = new Unit(entryNode, workDir, id, version, files);
+
+        Path nodeUnitDirectory = unit.getNodeUnitDirectory(entryNode);
+
+        for (DeployFile file : files) {
+            Path filePath = 
nodeUnitDirectory.resolve(file.file().getFileName());
+            assertTrue(Files.exists(filePath));
+        }
+
+        return unit;
+    }
+
+    public Unit deployAndVerifySmall(String id, Version version, IgniteImpl 
entryNode) {
+        return deployAndVerify(id, version, smallFile(), entryNode);
+    }
+
+    public Unit deployAndVerifyMedium(String id, Version version, IgniteImpl 
entryNode) {
+        return deployAndVerify(id, version, mediumFile(), entryNode);
+    }
+
+    public Unit deployAndVerifyBig(String id, Version version, IgniteImpl 
entryNode) {
+        return deployAndVerify(id, version, bigFile(), entryNode);
+    }
+
+    public static UnitStatuses buildStatus(String id, Unit... units) {
+        UnitStatusesBuilder builder = UnitStatuses.builder(id);
+        for (Unit unit : units) {
+            builder.append(unit.version(), DEPLOYED);
+        }
+
+        return builder.build();
+    }
+
+    private static DeploymentUnit fromPaths(List<Path> paths) {
+        Objects.requireNonNull(paths);
+        Map<String, InputStream> map = new HashMap<>();
+        try {
+            for (Path path : paths) {
+                map.put(path.getFileName().toString(), 
Files.newInputStream(path));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return () -> map;
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
new file mode 100644
index 0000000000..7275899667
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployment;
+
+import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.deployunit.IgniteDeployment;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for {@link IgniteDeployment} for recovery logic.
+ */
+public class ItDeploymentUnitFailoverTest extends 
ClusterPerTestIntegrationTest {
+    private DeployFiles files;
+
+    @BeforeEach
+    public void generateDummy() {
+        files = new DeployFiles(workDir);
+    }
+
+    @Override
+    protected int initialNodes() {
+        return 9;
+    }
+
+    @Override
+    protected int[] cmgMetastoreNodes() {
+        return new int[] {6, 7, 8};
+    }
+
+    @Test
+    public void testDeployWithNodeStop() {
+        IgniteImpl cmgNode = cluster.node(8);
+        Unit big = files.deployAndVerifyBig("id1", 
Version.parseVersion("1.0.0"), cluster.node(3));
+
+        stopNode(8);
+
+        big.waitUnitClean(cmgNode);
+        big.waitUnitReplica(cluster.node(6));
+        big.waitUnitReplica(cluster.node(7));
+
+        cmgNode = startNode(8);
+        big.waitUnitReplica(cmgNode);
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index b1251c2d6e..6931cebc11 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.deployment;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.deployment.DeployFiles.buildStatus;
 import static 
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
 import static 
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.UPLOADING;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -26,29 +27,16 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.nullValue;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.deployunit.DeploymentUnit;
 import org.apache.ignite.internal.deployunit.IgniteDeployment;
 import org.apache.ignite.internal.deployunit.UnitStatuses;
-import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder;
-import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
 import org.apache.ignite.internal.rest.api.deployment.DeploymentStatus;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -56,44 +44,21 @@ import org.junit.jupiter.api.Test;
  * Integration tests for {@link IgniteDeployment}.
  */
 public class ItDeploymentUnitTest extends ClusterPerTestIntegrationTest {
-    private static final int BASE_REPLICA_TIMEOUT = 30;
+    private DeployFiles files;
 
-    private static final long SMALL_IN_BYTES = 1024L;
-
-    private static final long MEDIUM_IN_BYTES = 1024L * 1024L;
-
-    private static final long BIG_IN_BYTES = 1024L * 1024L * 1024L;
-
-    private DeployFile smallFile;
-
-    private DeployFile mediumFile;
-
-    private DeployFile bigFile;
-
-    private List<DeployFile> allFiles;
 
     @BeforeEach
-    public void generateDummy() throws IOException {
-        smallFile = create("small.txt", SMALL_IN_BYTES, BASE_REPLICA_TIMEOUT);
-        mediumFile = create("medium.txt", MEDIUM_IN_BYTES, 
BASE_REPLICA_TIMEOUT * 2);
-        // TODO https://issues.apache.org/jira/browse/IGNITE-19009
-        // bigFile = create("big.txt", BIG_IN_BYTES, BASE_REPLICA_TIMEOUT * 3);
-        allFiles = List.of(smallFile, mediumFile);
-    }
-
-    private DeployFile create(String name, long size, int replicaTimeout) 
throws IOException {
-        DeployFile deployFile = new DeployFile(workDir.resolve(name), size, 
replicaTimeout);
-        deployFile.ensureExists();
-        return deployFile;
+    public void generateDummy() {
+        files = new DeployFiles(workDir);
     }
 
     @Test
     public void testDeploy() {
         String id = "test";
-        Unit unit = deployAndVerifySmall(id, Version.parseVersion("1.1.0"), 1);
+        Unit unit = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.0"), cluster.node(1));
 
         IgniteImpl cmg = cluster.node(0);
-        waitUnitReplica(cmg, unit);
+        unit.waitUnitReplica(cmg);
 
         UnitStatuses status = buildStatus(id, unit);
 
@@ -105,10 +70,15 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     @Test
     public void deployDirectory() {
         String id = "test";
-        Unit unit = deployAndVerify(id, Version.parseVersion("1.1.0"), false, 
allFiles, 1);
+        Unit unit = files.deployAndVerify(id,
+                Version.parseVersion("1.1.0"),
+                false,
+                List.of(files.smallFile(), files.mediumFile()),
+                cluster.node(1)
+        );
 
         IgniteImpl cmg = cluster.node(0);
-        waitUnitReplica(cmg, unit);
+        unit.waitUnitReplica(cmg);
 
         UnitStatuses status = buildStatus(id, unit);
 
@@ -119,13 +89,13 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
 
     @Test
     public void testDeployUndeploy() {
-        Unit unit = deployAndVerifySmall("test", 
Version.parseVersion("1.1.0"), 1);
+        Unit unit = files.deployAndVerifySmall("test", 
Version.parseVersion("1.1.0"), cluster.node(1));
 
         IgniteImpl cmg = cluster.node(0);
-        waitUnitReplica(cmg, unit);
+        unit.waitUnitReplica(cmg);
 
         unit.undeploy();
-        waitUnitClean(cmg, unit);
+        unit.waitUnitClean(cmg);
 
         CompletableFuture<List<UnitStatuses>> list = 
node(2).deployment().clusterStatusesAsync();
         assertThat(list, willBe(Collections.emptyList()));
@@ -134,12 +104,12 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     @Test
     public void testDeployTwoUnits() {
         String id = "test";
-        Unit unit1 = deployAndVerifySmall(id, Version.parseVersion("1.1.0"), 
1);
-        Unit unit2 = deployAndVerifySmall(id, Version.parseVersion("1.1.1"), 
2);
+        Unit unit1 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.0"), cluster.node(1));
+        Unit unit2 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.1"), cluster.node(2));
 
         IgniteImpl cmg = cluster.node(0);
-        waitUnitReplica(cmg, unit1);
-        waitUnitReplica(cmg, unit2);
+        unit1.waitUnitReplica(cmg);
+        unit2.waitUnitReplica(cmg);
 
         UnitStatuses status = buildStatus(id, unit1, unit2);
 
@@ -147,19 +117,19 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
                 .pollDelay(100, MILLISECONDS)
                 .until(() -> node(2).deployment().clusterStatusesAsync(id), 
willBe(status));
 
-        CompletableFuture<List<Version>> versions = 
node(2).deployment().versionsAsync(unit1.id);
-        assertThat(versions, willBe(List.of(unit1.version, unit2.version)));
+        CompletableFuture<List<Version>> versions = 
node(2).deployment().versionsAsync(unit1.id());
+        assertThat(versions, willBe(List.of(unit1.version(), 
unit2.version())));
     }
 
     @Test
     public void testDeployTwoUnitsAndUndeployOne() {
         String id = "test";
-        Unit unit1 = deployAndVerifySmall(id, Version.parseVersion("1.1.0"), 
1);
-        Unit unit2 = deployAndVerifySmall(id, Version.parseVersion("1.1.1"), 
2);
+        Unit unit1 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.0"), cluster.node(1));
+        Unit unit2 = files.deployAndVerifySmall(id, 
Version.parseVersion("1.1.1"), cluster.node(2));
 
         IgniteImpl cmg = cluster.node(0);
-        waitUnitReplica(cmg, unit1);
-        waitUnitReplica(cmg, unit2);
+        unit1.waitUnitReplica(cmg);
+        unit2.waitUnitReplica(cmg);
 
         UnitStatuses status = buildStatus(id, unit1, unit2);
 
@@ -168,21 +138,21 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
                 .until(() -> node(2).deployment().clusterStatusesAsync(id), 
willBe(status));
 
         unit2.undeploy();
-        CompletableFuture<List<Version>> newVersions = 
node(2).deployment().versionsAsync(unit1.id);
-        assertThat(newVersions, willBe(List.of(unit1.version)));
+        CompletableFuture<List<Version>> newVersions = 
node(2).deployment().versionsAsync(unit1.id());
+        assertThat(newVersions, willBe(List.of(unit1.version())));
     }
 
     @Test
     public void testDeploymentStatus() {
         String id = "test";
         Version version = Version.parseVersion("1.1.0");
-        Unit unit = deployAndVerifyMedium(id, version, 1);
+        Unit unit = files.deployAndVerifyMedium(id, version, cluster.node(1));
 
         CompletableFuture<DeploymentStatus> status = 
node(2).deployment().clusterStatusAsync(id, version);
         assertThat(status, willBe(UPLOADING));
 
         IgniteImpl cmg = cluster.node(0);
-        waitUnitReplica(cmg, unit);
+        unit.waitUnitReplica(cmg);
 
         await().timeout(2, SECONDS)
                 .pollDelay(300, MILLISECONDS)
@@ -190,8 +160,8 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
 
         assertThat(unit.undeployAsync(), willSucceedFast());
 
-        waitUnitClean(unit.deployedNode, unit);
-        waitUnitClean(cmg, unit);
+        unit.waitUnitClean(unit.deployedNode());
+        unit.waitUnitClean(cmg);
 
         assertThat(node(2).deployment().clusterStatusAsync(id, version), 
willBe(nullValue()));
     }
@@ -200,223 +170,59 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     public void testRedeploy() {
         String id = "test";
         String version = "1.1.0";
-        Unit smallUnit = deployAndVerify(id, Version.parseVersion(version), 
smallFile, 1);
+        Unit smallUnit = files.deployAndVerifySmall(id, 
Version.parseVersion(version), cluster.node(1));
 
         IgniteImpl cmg = cluster.node(0);
-        waitUnitReplica(cmg, smallUnit);
+        smallUnit.waitUnitReplica(cmg);
 
-        Unit mediumUnit = deployAndVerify(id, Version.parseVersion(version), 
true, mediumFile, 1);
-        waitUnitReplica(cmg, mediumUnit);
+        Unit mediumUnit = files.deployAndVerify(id, 
Version.parseVersion(version), true, List.of(files.mediumFile()), 
cluster.node(1));
+        mediumUnit.waitUnitReplica(cmg);
 
-        waitUnitClean(smallUnit.deployedNode, smallUnit);
-        waitUnitClean(cmg, smallUnit);
+        smallUnit.waitUnitClean(smallUnit.deployedNode());
+        smallUnit.waitUnitClean(cmg);
     }
 
     @Test
     public void testOnDemandDeploy() {
         String id = "test";
         Version version = Version.parseVersion("1.1.0");
-        Unit smallUnit = deployAndVerify(id, version, smallFile, 1);
+        Unit smallUnit = files.deployAndVerifySmall(id, version, 
cluster.node(1));
 
         IgniteImpl cmg = cluster.node(0);
-        waitUnitReplica(cmg, smallUnit);
+        smallUnit.waitUnitReplica(cmg);
 
         IgniteImpl onDemandDeployNode = cluster.node(2);
         CompletableFuture<Boolean> onDemandDeploy = 
onDemandDeployNode.deployment().onDemandDeploy(id, version);
 
         assertThat(onDemandDeploy, willBe(true));
-        waitUnitReplica(onDemandDeployNode, smallUnit);
+        smallUnit.waitUnitReplica(onDemandDeployNode);
     }
 
     @Test
     public void testOnDemandDeployToDeployedNode() {
         String id = "test";
         Version version = Version.parseVersion("1.1.0");
-        Unit smallUnit = deployAndVerify(id, version, smallFile, 1);
+        Unit smallUnit = files.deployAndVerifySmall(id, version, 
cluster.node(1));
 
         IgniteImpl cmg = cluster.node(0);
-        waitUnitReplica(cmg, smallUnit);
+        smallUnit.waitUnitReplica(cmg);
 
         IgniteImpl onDemandDeployNode = cluster.node(1);
         CompletableFuture<Boolean> onDemandDeploy = 
onDemandDeployNode.deployment().onDemandDeploy(id, version);
 
         assertThat(onDemandDeploy, willBe(true));
-        waitUnitReplica(onDemandDeployNode, smallUnit);
+        smallUnit.waitUnitReplica(onDemandDeployNode);
     }
 
     @Test
     public void testDeployToCmg() {
         String id = "test";
         Version version = Version.parseVersion("1.1.0");
-        Unit smallUnit = deployAndVerify(id, version, smallFile, 0);
+        Unit smallUnit = files.deployAndVerifySmall(id, version, 
cluster.node(0));
 
         await().untilAsserted(() -> {
             CompletableFuture<List<UnitStatuses>> list = 
node(0).deployment().clusterStatusesAsync();
             assertThat(list, 
willBe(List.of(UnitStatuses.builder(id).append(version, DEPLOYED).build())));
         });
     }
-
-    private UnitStatuses buildStatus(String id, Unit... units) {
-        UnitStatusesBuilder builder = UnitStatuses.builder(id);
-        for (Unit unit : units) {
-            builder.append(unit.version, DEPLOYED);
-        }
-
-        return builder.build();
-    }
-
-    private Unit deployAndVerify(String id, Version version, DeployFile file, 
int nodeIndex) {
-        return deployAndVerify(id, version, false, file, nodeIndex);
-    }
-
-    private Unit deployAndVerify(String id, Version version, boolean force, 
DeployFile file, int nodeIndex) {
-        return deployAndVerify(id, version, force, List.of(file), nodeIndex);
-    }
-
-    private Unit deployAndVerify(String id, Version version, boolean force, 
List<DeployFile> files, int nodeIndex) {
-        IgniteImpl entryNode = node(nodeIndex);
-
-        List<Path> paths = files.stream()
-                .map(deployFile -> deployFile.file)
-                .collect(Collectors.toList());
-
-        CompletableFuture<Boolean> deploy = entryNode.deployment()
-                .deployAsync(id, version, force, fromPaths(paths));
-
-        assertThat(deploy, willBe(true));
-
-        Unit unit = new Unit(entryNode, id, version, files);
-
-        Path nodeUnitDirectory = getNodeUnitDirectory(entryNode, id, version);
-
-        for (DeployFile file : files) {
-            Path filePath = nodeUnitDirectory.resolve(file.file.getFileName());
-            assertTrue(Files.exists(filePath));
-        }
-
-        return unit;
-    }
-
-    private Unit deployAndVerifySmall(String id, Version version, int 
nodeIndex) {
-        return deployAndVerify(id, version, smallFile, nodeIndex);
-    }
-
-    private Unit deployAndVerifyMedium(String id, Version version, int 
nodeIndex) {
-        return deployAndVerify(id, version, mediumFile, nodeIndex);
-    }
-
-    private Unit deployAndVerifyBig(String id, Version version, int nodeIndex) 
{
-        return deployAndVerify(id, version, bigFile, nodeIndex);
-    }
-
-    private Path getNodeUnitDirectory(IgniteImpl node, String unitId, Version 
unitVersion) {
-        String deploymentFolder = node.nodeConfiguration()
-                .getConfiguration(DeploymentConfiguration.KEY)
-                .deploymentLocation().value();
-        Path resolve = workDir.resolve(node.name()).resolve(deploymentFolder);
-        return resolve.resolve(unitId)
-                .resolve(unitVersion.render());
-    }
-
-    private void waitUnitReplica(IgniteImpl ignite, Unit unit) {
-        Path unitDirectory = getNodeUnitDirectory(ignite, unit.id, 
unit.version);
-
-        int combinedTimeout = unit.files.stream().map(file -> 
file.replicaTimeout).reduce(Integer::sum).get();
-
-        await().timeout(combinedTimeout, SECONDS)
-                .pollDelay(1, SECONDS)
-                .ignoreException(IOException.class)
-                .until(() -> {
-                    for (DeployFile file : unit.files) {
-                        Path filePath = 
unitDirectory.resolve(file.file.getFileName());
-                        if (Files.notExists(filePath) || Files.size(filePath) 
!= file.expectedSize) {
-                            return false;
-                        }
-                    }
-
-                    return true;
-                });
-    }
-
-    private void waitUnitClean(IgniteImpl ignite, Unit unit) {
-        Path unitDirectory = getNodeUnitDirectory(ignite, unit.id, 
unit.version);
-
-        int combinedTimeout = unit.files.stream().map(file -> 
file.replicaTimeout).reduce(Integer::sum).get();
-
-        await().timeout(combinedTimeout, SECONDS)
-                .pollDelay(2, SECONDS)
-                .until(() -> {
-                    for (DeployFile file : unit.files) {
-                        Path filePath = 
unitDirectory.resolve(file.file.getFileName());
-                        if (Files.exists(filePath)) {
-                            return false;
-                        }
-                    }
-
-                    return true;
-                });
-    }
-
-    class Unit {
-        private final IgniteImpl deployedNode;
-
-        private final String id;
-
-        private final Version version;
-
-        private final List<DeployFile> files;
-
-        Unit(IgniteImpl deployedNode, String id, Version version, 
List<DeployFile> files) {
-            this.deployedNode = deployedNode;
-            this.id = id;
-            this.version = version;
-            this.files = files;
-        }
-
-        CompletableFuture<Boolean> undeployAsync() {
-            return deployedNode.deployment().undeployAsync(id, version);
-        }
-
-        void undeploy() {
-            deployedNode.deployment().undeployAsync(id, version);
-            waitUnitClean(deployedNode, this);
-        }
-    }
-
-    private static class DeployFile {
-        private final Path file;
-
-        private final long expectedSize;
-
-        private final int replicaTimeout;
-
-        private DeployFile(Path file, long expectedSize, int replicaTimeout) {
-            this.file = file;
-            this.expectedSize = expectedSize;
-            this.replicaTimeout = replicaTimeout;
-        }
-
-        public void ensureExists() throws IOException {
-            ensureFile(file, expectedSize);
-        }
-
-        private static void ensureFile(Path path, long size) throws 
IOException {
-            if (!Files.exists(path)) {
-                IgniteUtils.fillDummyFile(path, size);
-            }
-        }
-    }
-
-    private static DeploymentUnit fromPaths(List<Path> paths) {
-        Objects.requireNonNull(paths);
-        Map<String, InputStream> map = new HashMap<>();
-        try {
-            for (Path path : paths) {
-                map.put(path.getFileName().toString(), 
Files.newInputStream(path));
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return () -> map;
-    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/Unit.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/Unit.java
new file mode 100644
index 0000000000..9253c27486
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/Unit.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployment;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
+
+class Unit {
+    private final IgniteImpl deployedNode;
+
+    private final String id;
+
+    private final Version version;
+
+    private final List<DeployFile> files;
+
+    private final Path workDir;
+
+    Unit(IgniteImpl deployedNode, Path workDir, String id, Version version, 
List<DeployFile> files) {
+        this.deployedNode = deployedNode;
+        this.workDir = workDir;
+        this.id = id;
+        this.version = version;
+        this.files = files;
+    }
+
+    public String id() {
+        return id;
+    }
+
+    public Version version() {
+        return version;
+    }
+
+    public List<DeployFile> files() {
+        return files;
+    }
+
+    IgniteImpl deployedNode() {
+        return deployedNode;
+    }
+
+    CompletableFuture<Boolean> undeployAsync() {
+        return deployedNode.deployment().undeployAsync(id, version);
+    }
+
+    void undeploy() {
+        deployedNode.deployment().undeployAsync(id, version);
+        waitUnitClean(deployedNode);
+    }
+
+    void waitUnitClean(IgniteImpl ignite) {
+        Path unitDirectory = getNodeUnitDirectory(ignite);
+
+        int combinedTimeout = 
files.stream().map(DeployFile::replicaTimeout).reduce(Integer::sum).get();
+
+        await().timeout(combinedTimeout, SECONDS)
+                .pollDelay(2, SECONDS)
+                .until(() -> {
+                    for (DeployFile file : files) {
+                        Path filePath = 
unitDirectory.resolve(file.file().getFileName());
+                        if (Files.exists(filePath)) {
+                            return false;
+                        }
+                    }
+
+                    return true;
+                });
+    }
+
+    Path getNodeUnitDirectory(IgniteImpl ignite) {
+        String deploymentFolder = ignite.nodeConfiguration()
+                .getConfiguration(DeploymentConfiguration.KEY)
+                .deploymentLocation().value();
+        return workDir
+                .resolve(ignite.name())
+                .resolve(deploymentFolder)
+                .resolve(id)
+                .resolve(version.render());
+    }
+
+    public Path getNodeUnitDirectory() {
+        return getNodeUnitDirectory(deployedNode);
+    }
+
+    void waitUnitReplica(IgniteImpl ignite) {
+        Path unitDirectory = getNodeUnitDirectory(ignite);
+
+        int combinedTimeout = 
files.stream().map(DeployFile::replicaTimeout).reduce(Integer::sum).get();
+
+        await().timeout(combinedTimeout, SECONDS)
+                .pollDelay(1, SECONDS)
+                .ignoreException(IOException.class)
+                .until(() -> {
+                    for (DeployFile file : files) {
+                        Path filePath = 
unitDirectory.resolve(file.file().getFileName());
+                        if (Files.notExists(filePath) || Files.size(filePath) 
!= file.expectedSize()) {
+                            return false;
+                        }
+                    }
+
+                    return true;
+                });
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
index 3bc2568282..884629ec79 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
@@ -70,7 +70,7 @@ class ItJdbcTest extends IgniteIntegrationTest {
         @BeforeAll
         void setUp(TestInfo testInfo, @WorkDirectory Path workDir) {
             cluster = new Cluster(testInfo, workDir);
-            cluster.startAndInit(1, builder -> builder.clusterConfiguration(
+            cluster.startAndInit(1, new int[]{ 0 }, builder -> 
builder.clusterConfiguration(
                     "{\n"
                             + "  \"security\": {\n"
                             + "    \"authentication\": {\n"
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index d937915332..74448f0f8f 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -531,6 +531,7 @@ public class IgniteImpl implements Ignite {
         deploymentManager = new DeploymentManagerImpl(
                 clusterSvc,
                 metaStorageMgr,
+                logicalTopologyService,
                 workDir,
                 
nodeConfigRegistry.getConfiguration(DeploymentConfiguration.KEY),
                 cmgMgr

Reply via email to