This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bde0357dd0 IGNITE-18973 Introduce unit statuses (#1809)
bde0357dd0 is described below

commit bde0357dd0f72cea773e4f3663fe3ca47df51de5
Author: Mikhail <[email protected]>
AuthorDate: Fri Mar 24 17:09:05 2023 +0300

    IGNITE-18973 Introduce unit statuses (#1809)
---
 .../apache/ignite/deployment/DeploymentInfo.java   | 115 +++++++++
 .../ignite/deployment/DeploymentStatus.java}       |  29 ++-
 .../apache/ignite/deployment/IgniteDeployment.java |  11 -
 .../org/apache/ignite/deployment/UnitStatus.java   |  36 +--
 .../deployunit/DeployMessagingService.java         | 232 ++++++++++++++++++
 .../deployunit/DeployMetastoreService.java         | 133 ++++++++++
 .../ignite/internal/deployunit/DeployTracker.java  |  70 ++++++
 .../internal/deployunit/DeploymentManagerImpl.java | 244 +++++--------------
 .../internal/deployunit/FileDeployerService.java   | 113 +++++++++
 .../ignite/internal/deployunit/UnitMeta.java       |  32 ++-
 .../ignite/internal/deployunit/key/UnitKey.java    |   5 +-
 .../deployunit/key/UnitMetaSerializer.java         |   8 +-
 .../deployunit/message/DeployUnitMessageTypes.java |   4 +
 .../deployunit/message/DeployUnitResponse.java     |   8 +-
 ...loyUnitResponse.java => StopDeployRequest.java} |  21 +-
 ...oyUnitResponse.java => StopDeployResponse.java} |  16 +-
 .../deployunit/message/UndeployUnitResponse.java   |   8 -
 .../deployunit/metastore/ListAccumulator.java      |  58 +++++
 .../metastore/UnitStatusAccumulator.java           |   9 +-
 .../deployunit/metastore/UnitsAccumulator.java     |   7 +-
 .../ignite/deployment/UnitMetaSerializerTest.java  |  11 +-
 .../ignite/internal/future/InFlightFutures.java    |   3 +-
 .../deployment/DeploymentManagementController.java |   2 +-
 modules/runner/build.gradle                        |   1 +
 .../internal/deployment/ItDeploymentUnitTest.java  | 269 +++++++++++++++------
 25 files changed, 1104 insertions(+), 341 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentInfo.java 
b/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentInfo.java
new file mode 100644
index 0000000000..68b909285e
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentInfo.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.deployment;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Data class with deployment unit information.
+ */
+public class DeploymentInfo {
+    private final DeploymentStatus status;
+
+    private final List<String> consistentIds;
+
+    private DeploymentInfo(DeploymentStatus status, List<String> 
consistentIds) {
+        this.status = status;
+        this.consistentIds = Collections.unmodifiableList(consistentIds);
+    }
+
+    public DeploymentStatus status() {
+        return status;
+    }
+
+    public List<String> consistentIds() {
+        return consistentIds;
+    }
+
+    public static DeploymentInfoBuilder builder() {
+        return new DeploymentInfoBuilder();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        DeploymentInfo that = (DeploymentInfo) o;
+
+        return status == that.status
+                && (consistentIds != null ? 
consistentIds.equals(that.consistentIds) : that.consistentIds == null);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = status != null ? status.hashCode() : 0;
+        result = 31 * result + (consistentIds != null ? 
consistentIds.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "DeploymentInfo{"
+                + "status=" + status
+                + ", consistentIds=[" + String.join(", ", consistentIds) + "]"
+                + '}';
+    }
+
+    /**
+     * Builder for {@link DeploymentInfo}.
+     */
+    public static final class DeploymentInfoBuilder {
+        private DeploymentStatus status;
+        private final List<String> consistentIds = new ArrayList<>();
+
+        public DeploymentInfoBuilder status(DeploymentStatus status) {
+            this.status = status;
+            return this;
+        }
+
+        public DeploymentInfoBuilder addConsistentId(String consistentId) {
+            consistentIds.add(consistentId);
+            return this;
+        }
+
+        public DeploymentInfoBuilder addConsistentIds(Collection<String> 
consistentIds) {
+            this.consistentIds.addAll(consistentIds);
+            return this;
+        }
+
+        /**
+         * Build {@link DeploymentInfo} instance.
+         *
+         * @return {@link DeploymentInfo} instance.
+         */
+        public DeploymentInfo build() {
+            Objects.requireNonNull(status);
+
+            return new DeploymentInfo(status, consistentIds);
+        }
+
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
 b/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentStatus.java
similarity index 60%
copy from 
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
copy to 
modules/api/src/main/java/org/apache/ignite/deployment/DeploymentStatus.java
index ee298c8a1d..41cf27d845 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentStatus.java
@@ -15,22 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.deployunit.message;
-
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
-import org.apache.ignite.network.annotations.Transferable;
+package org.apache.ignite.deployment;
 
 /**
- * Deploy unit response.
+ * Status of deployment process.
  */
-@Transferable(DeployUnitMessageTypes.DEPLOY_UNIT_RESPONSE)
-public interface DeployUnitResponse extends NetworkMessage {
-    /**
-     * Returns error which happens on deploy process.
-     *
-     * @return error which happens on deploy process.
-     */
-    @Marshallable
-    Throwable error();
+public enum DeploymentStatus {
+    /** Unit deployment is in progress. */
+    UPLOADING,
+
+    /** Unit is deployed on the cluster. */
+    DEPLOYED,
+
+    /** Remove command was initiated for the unit and it will be removed soon. 
*/
+    OBSOLETE,
+
+    /** Unit removal from the cluster is in progress. */
+    REMOVING
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java 
b/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
index d74ccf8db3..63514b091a 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
@@ -25,17 +25,6 @@ import org.apache.ignite.deployment.version.Version;
  * Provides access to the Deployment Unit functionality.
  */
 public interface IgniteDeployment {
-    /**
-     * Deploy provided unit to current node with latest version.
-     *
-     * @param id Unit identifier. Not empty and not null.
-     * @param deploymentUnit Unit content.
-     * @return Future with success or not result.
-     */
-    default CompletableFuture<Boolean> deployAsync(String id, DeploymentUnit 
deploymentUnit) {
-        return deployAsync(id, Version.LATEST, deploymentUnit);
-    }
-
     /**
      * Deploy provided unit to current node.
      * After deploy finished, this deployment unit will be place to CMG group 
asynchronously.
diff --git 
a/modules/api/src/main/java/org/apache/ignite/deployment/UnitStatus.java 
b/modules/api/src/main/java/org/apache/ignite/deployment/UnitStatus.java
index 771a0fd8db..478d67a220 100644
--- a/modules/api/src/main/java/org/apache/ignite/deployment/UnitStatus.java
+++ b/modules/api/src/main/java/org/apache/ignite/deployment/UnitStatus.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import org.apache.ignite.deployment.version.Version;
-import org.apache.ignite.internal.tostring.S;
 
 /**
  * Deployment unit status.
@@ -38,7 +37,7 @@ public class UnitStatus {
     /**
      * Map from existing unit version to list of nodes consistent ids where 
unit deployed.
      */
-    private final Map<Version, List<String>> versionToConsistentIds;
+    private final Map<Version, DeploymentInfo> versionToDeploymentInfo;
 
     /**
      * Constructor.
@@ -47,9 +46,10 @@ public class UnitStatus {
      * @param versionToConsistentIds Map from existing unit version to list
      *      of nodes consistent ids where unit deployed.
      */
-    private UnitStatus(String id, Map<Version, List<String>> 
versionToConsistentIds) {
+    private UnitStatus(String id,
+            Map<Version, DeploymentInfo> versionToConsistentIds) {
         this.id = id;
-        this.versionToConsistentIds = 
Collections.unmodifiableMap(versionToConsistentIds);
+        this.versionToDeploymentInfo = 
Collections.unmodifiableMap(versionToConsistentIds);
     }
 
     /**
@@ -67,7 +67,7 @@ public class UnitStatus {
      * @return unit version.
      */
     public Set<Version> versions() {
-        return Collections.unmodifiableSet(versionToConsistentIds.keySet());
+        return Collections.unmodifiableSet(versionToDeploymentInfo.keySet());
     }
 
     /**
@@ -77,7 +77,11 @@ public class UnitStatus {
      * @return consistent ids of nodes for provided version.
      */
     public List<String> consistentIds(Version version) {
-        return 
Collections.unmodifiableList(versionToConsistentIds.get(version));
+        return 
Collections.unmodifiableList(versionToDeploymentInfo.get(version).consistentIds());
+    }
+
+    public DeploymentStatus status(Version version) {
+        return versionToDeploymentInfo.get(version).status();
     }
 
     /**
@@ -102,19 +106,21 @@ public class UnitStatus {
             return false;
         }
         UnitStatus that = (UnitStatus) o;
-        return Objects.equals(id, that.id) && 
Objects.equals(versionToConsistentIds, that.versionToConsistentIds);
+        return Objects.equals(id, that.id) && 
Objects.equals(versionToDeploymentInfo, that.versionToDeploymentInfo);
     }
 
     /** {@inheritDoc} */
     @Override
     public int hashCode() {
-        return Objects.hash(id, versionToConsistentIds);
+        return Objects.hash(id, versionToDeploymentInfo);
     }
 
-    /** {@inheritDoc} */
     @Override
     public String toString() {
-        return S.toString(this);
+        return "UnitStatus{"
+                + "id='" + id + '\''
+                + ", versionToDeploymentInfo=" + versionToDeploymentInfo
+                + '}';
     }
 
     /**
@@ -123,7 +129,7 @@ public class UnitStatus {
     public static class UnitStatusBuilder {
 
         private final String id;
-        private final Map<Version, List<String>> versionToConsistentIds = new 
HashMap<>();
+        private final Map<Version, DeploymentInfo> versionToInfo = new 
HashMap<>();
 
         /**
          * Constructor.
@@ -138,11 +144,11 @@ public class UnitStatus {
          * Append node consistent ids with provided version.
          *
          * @param version Unit version.
-         * @param consistentIds Node consistent ids.
+         * @param deploymentInfo Node consistent ids.
          * @return {@code this} builder for use in a chained invocation.
          */
-        public UnitStatusBuilder append(Version version, List<String> 
consistentIds) {
-            versionToConsistentIds.put(version, consistentIds);
+        public UnitStatusBuilder append(Version version, DeploymentInfo 
deploymentInfo) {
+            versionToInfo.put(version, deploymentInfo);
             return this;
         }
 
@@ -152,7 +158,7 @@ public class UnitStatus {
          * @return {@link UnitStatus} instance.
          */
         public UnitStatus build() {
-            return new UnitStatus(id, versionToConsistentIds);
+            return new UnitStatus(id, versionToInfo);
         }
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
new file mode 100644
index 0000000000..6370a5a3f3
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.version.Version;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponse;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseImpl;
+import org.apache.ignite.internal.deployunit.message.StopDeployRequest;
+import org.apache.ignite.internal.deployunit.message.StopDeployRequestImpl;
+import org.apache.ignite.internal.deployunit.message.StopDeployResponseImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.network.ChannelType;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Messaging service for deploy actions.
+ */
+public class DeployMessagingService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DeployMessagingService.class);
+
+    private static final ChannelType DEPLOYMENT_CHANNEL = 
ChannelType.register((short) 1, "DeploymentUnits");
+
+    /**
+     * Cluster service.
+     */
+    private final ClusterService clusterService;
+
+    /**
+     * Cluster management group manager.
+     */
+    private final ClusterManagementGroupManager cmgManager;
+
+    /**
+     * File deployer service.
+     */
+    private final FileDeployerService deployerService;
+
+    /**
+     * Tracker of deploy actions.
+     */
+    private final DeployTracker tracker;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param cmgManager CMG manager.
+     * @param deployerService File deploying service.
+     * @param tracker Deploy action tracker.
+     */
+    public DeployMessagingService(
+            ClusterService clusterService,
+            ClusterManagementGroupManager cmgManager,
+            FileDeployerService deployerService,
+            DeployTracker tracker
+    ) {
+        this.clusterService = clusterService;
+        this.cmgManager = cmgManager;
+        this.deployerService = deployerService;
+        this.tracker = tracker;
+    }
+
+    /**
+     * Subscribe to all deployment messages.
+     */
+    public void subscribe() {
+        
clusterService.messagingService().addMessageHandler(DeployUnitMessageTypes.class,
+                (message, senderConsistentId, correlationId) -> {
+                    if (message instanceof DeployUnitRequest) {
+                        processDeployRequest((DeployUnitRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof UndeployUnitRequest) {
+                        processUndeployRequest((UndeployUnitRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof StopDeployRequest) {
+                        processStopDeployRequest((StopDeployRequest) message, 
senderConsistentId, correlationId);
+                    }
+                });
+    }
+
+    /**
+     * Start deployment process to all nodes from CMG group.
+     *
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @param unitName Deployment unit file name.
+     * @param unitContent Deployment unit file content.
+     * @return Future with deployment result.
+     */
+    public CompletableFuture<List<String>> startDeployAsyncToCmg(String id, 
Version version, String unitName, byte[] unitContent) {
+        DeployUnitRequest request = DeployUnitRequestImpl.builder()
+                .unitName(unitName)
+                .id(id)
+                .version(version.render())
+                .unitContent(unitContent)
+                .build();
+        return cmgManager.cmgNodes()
+                .thenCompose(nodes -> deploy(nodes, request));
+    }
+
+    private CompletableFuture<List<String>> deploy(Set<String> nodes, 
DeployUnitRequest request) {
+        CompletableFuture<List<String>> resultFuture = new 
CompletableFuture<>();
+        Map<String, Boolean> results = new ConcurrentHashMap<>();
+        CompletableFuture<Void> allDeployment = CompletableFuture.allOf(
+                nodes.stream().map(node -> 
clusterService.topologyService().getByConsistentId(node))
+                        .map(node -> requestDeploy(node, request)
+                                .thenAccept(deployed -> 
results.put(node.name(), deployed)))
+                        .toArray(CompletableFuture[]::new));
+
+        allDeployment.thenAccept(v -> resultFuture.complete(
+                results.entrySet().stream()
+                        .filter(Entry::getValue)
+                        .map(Entry::getKey)
+                        .collect(Collectors.toList()))
+        );
+
+        return resultFuture;
+    }
+
+    /**
+     * Stop all in-progress deployment processes for deployment unit with 
provided id and version.
+     *
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @return Future with stop result.
+     */
+    public CompletableFuture<Void> stopInProgressDeploy(String id, Version 
version) {
+        LOG.info("Stop in progress deploy for " + id + ":" + version);
+        return CompletableFuture.allOf(cmgManager.logicalTopology()
+                .thenApply(topology -> topology.nodes().stream().map(node ->
+                                clusterService.messagingService()
+                                        .invoke(node,
+                                                DEPLOYMENT_CHANNEL,
+                                                StopDeployRequestImpl
+                                                        .builder()
+                                                        .id(id)
+                                                        
.version(version.render())
+                                                        .build(),
+                                                Long.MAX_VALUE)
+                        ).toArray(CompletableFuture[]::new)));
+    }
+
+    /**
+     * Start undeploy process from provided node with provided id and version.
+     *
+     * @param node Cluster node.
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @return Future with undeploy result.
+     */
+    public CompletableFuture<Void> undeploy(ClusterNode node, String id, 
Version version) {
+        return clusterService.messagingService()
+                .invoke(node,
+                        DEPLOYMENT_CHANNEL,
+                        UndeployUnitRequestImpl.builder()
+                                .id(id)
+                                .version(version.render())
+                                .build(),
+                        Long.MAX_VALUE
+                ).thenAccept(message ->
+                        LOG.info("Undeploy unit " + id + ":" + version + " 
from node " + node + " finished"));
+    }
+
+    private CompletableFuture<Boolean> requestDeploy(ClusterNode clusterNode, 
DeployUnitRequest request) {
+        return clusterService.messagingService()
+                .invoke(clusterNode, DEPLOYMENT_CHANNEL, request, 
Long.MAX_VALUE)
+                .thenCompose(message -> {
+                    boolean success = ((DeployUnitResponse) message).success();
+                    if (!success) {
+                        LOG.error("Failed to deploy unit " + request.id() + 
":" + request.version()
+                                + " to node " + clusterNode);
+                    }
+                    return CompletableFuture.completedFuture(success);
+                });
+    }
+
+    private void processStopDeployRequest(StopDeployRequest request, String 
senderConsistentId, long correlationId) {
+        tracker.cancelIfDeploy(request.id(), 
Version.parseVersion(request.version()));
+        clusterService.messagingService()
+                .respond(senderConsistentId, 
StopDeployResponseImpl.builder().build(), correlationId);
+
+    }
+
+    private void processDeployRequest(DeployUnitRequest executeRequest, String 
senderConsistentId, long correlationId) {
+        String id = executeRequest.id();
+        String version = executeRequest.version();
+        deployerService.deploy(id, version, executeRequest.unitName(), 
executeRequest.unitContent())
+                .thenAccept(success -> 
clusterService.messagingService().respond(
+                        senderConsistentId,
+                        DEPLOYMENT_CHANNEL,
+                        
DeployUnitResponseImpl.builder().success(success).build(),
+                        correlationId)
+                );
+    }
+
+    private void processUndeployRequest(UndeployUnitRequest executeRequest, 
String senderConsistentId, long correlationId) {
+        LOG.info("Start to undeploy " + executeRequest.id() + " with version " 
+ executeRequest.version() + " from "
+                + clusterService.topologyService().localMember().name());
+        deployerService.undeploy(executeRequest.id(), executeRequest.version())
+                .thenRun(() -> clusterService.messagingService()
+                        .respond(senderConsistentId, 
UndeployUnitResponseImpl.builder().build(), correlationId));
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMetastoreService.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMetastoreService.java
new file mode 100644
index 0000000000..1222bb32a6
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMetastoreService.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.deployunit.key.UnitKey.allUnits;
+import static org.apache.ignite.internal.deployunit.key.UnitKey.key;
+import static org.apache.ignite.internal.deployunit.key.UnitKey.withId;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.function.Consumer;
+import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * Service for metastore access of deployment units.
+ */
+public class DeployMetastoreService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(DeployMetastoreService.class);
+
+    /**
+     * Meta storage.
+     */
+    private final MetaStorageManager metaStorage;
+
+    public DeployMetastoreService(MetaStorageManager metaStorage) {
+        this.metaStorage = metaStorage;
+    }
+
+    public CompletableFuture<Boolean> updateMeta(String id, Version version, 
Consumer<UnitMeta> transformer) {
+        return updateMeta(id, version, false, transformer);
+    }
+
+    /**
+     * Update deployment unit meta.
+     *
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @param transformer Deployment unit meta transformer.
+     * @return Future with update result.
+     */
+    public CompletableFuture<Boolean> updateMeta(String id, Version version, 
boolean force, Consumer<UnitMeta> transformer) {
+        LOG.info("Update meta for " + id + ":" + version);
+        ByteArray key = key(id, version);
+        return metaStorage.get(key)
+                .thenCompose(e -> {
+                    LOG.info("Revision " + e.revision() + " for " + id + ":" + 
version);
+                    if (e.value() == null) {
+                        return CompletableFuture.completedFuture(false);
+                    }
+                    UnitMeta prev = UnitMetaSerializer.deserialize(e.value());
+
+                    transformer.accept(prev);
+
+                    return metaStorage.invoke(
+                            force ? exists(key) : 
revision(key).le(e.revision()),
+                            put(key, UnitMetaSerializer.serialize(prev)),
+                            noop());
+                });
+    }
+
+    /**
+     * Put deployment unit meta if not exists.
+     *
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @param meta Deployment unit meta.
+     * @return Future with put result.
+     */
+    public CompletableFuture<Boolean> putIfNotExist(String id, Version 
version, UnitMeta meta) {
+        ByteArray key = key(id, version);
+        Operation put = put(key, UnitMetaSerializer.serialize(meta));
+        return metaStorage.invoke(notExists(key), put, noop());
+    }
+
+    /**
+     * Remove deployment unit meta if exist.
+     *
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     */
+    public void removeIfExist(String id, Version version) {
+        ByteArray key = key(id, version);
+        metaStorage.invoke(exists(key), remove(key), noop());
+    }
+
+    /**
+     * Returns {@link Publisher} instance with all deployment unit metas.
+     *
+     * @return {@link Publisher} instance with all deployment unit metas.
+     */
+    public Publisher<Entry> getAll() {
+        return metaStorage.prefix(allUnits());
+    }
+
+    /**
+     * Returns {@link Publisher} instance with all deployment unit metas with 
provided id.
+     *
+     * @param id Deployment unit identifier.
+     * @return {@link Publisher} instance with all deployment unit metas with 
provided id.
+     */
+    public Publisher<Entry> getAllWithId(String id) {
+        return metaStorage.prefix(withId(id));
+    }
+
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
new file mode 100644
index 0000000000..a32a3e978d
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.deployunit.key.UnitKey.key;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * Deploy actions tracker.
+ */
+public class DeployTracker {
+    /**
+     * In flight futures tracker.
+     */
+    private final Map<ByteArray, InFlightFutures> inFlightFutures = new 
ConcurrentHashMap<>();
+
+    /**
+     * Track deploy action.
+     *
+     * @param <T> Future result type.
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @param trackableAction Deploy action.
+     * @return {@param trackableAction}.
+     */
+    public <T> CompletableFuture<T> track(String id, Version version, 
CompletableFuture<T> trackableAction) {
+        return inFlightFutures.computeIfAbsent(key(id, version), k -> new 
InFlightFutures()).registerFuture(trackableAction);
+    }
+
+    /**
+     * Cancel deploy action for deployment unit with provided id and version 
if exists.
+     *
+     * @param id Deployment unit identifier.
+     * @param version Deployment version identifier.
+     */
+    public void cancelIfDeploy(String id, Version version) {
+        InFlightFutures futureTracker = inFlightFutures.get(key(id, version));
+        if (futureTracker != null) {
+            futureTracker.cancelInFlightFutures();
+        }
+    }
+
+    /**
+     * Cancel all deploy actions.
+     */
+    public void cancelAll() {
+        
inFlightFutures.values().forEach(InFlightFutures::cancelInFlightFutures);
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 855b902844..470110e673 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -17,26 +17,18 @@
 
 package org.apache.ignite.internal.deployunit;
 
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.SYNC;
-import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
-import static org.apache.ignite.internal.deployunit.key.UnitKey.allUnits;
-import static org.apache.ignite.internal.deployunit.key.UnitKey.key;
-import static org.apache.ignite.internal.deployunit.key.UnitKey.withId;
-import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
-import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
-import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
-import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static java.util.concurrent.CompletableFuture.allOf;
+import static org.apache.ignite.deployment.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.deployment.DeploymentStatus.OBSOLETE;
+import static org.apache.ignite.deployment.DeploymentStatus.REMOVING;
 
 import java.io.IOException;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.deployment.DeploymentStatus;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.deployment.IgniteDeployment;
 import org.apache.ignite.deployment.UnitStatus;
@@ -47,32 +39,14 @@ import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitAlreadyExis
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
 import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
 import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
-import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
-import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
-import org.apache.ignite.internal.deployunit.message.DeployUnitRequestBuilder;
-import org.apache.ignite.internal.deployunit.message.DeployUnitRequestImpl;
-import org.apache.ignite.internal.deployunit.message.DeployUnitResponse;
-import org.apache.ignite.internal.deployunit.message.DeployUnitResponseBuilder;
-import org.apache.ignite.internal.deployunit.message.DeployUnitResponseImpl;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitResponse;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
 import org.apache.ignite.internal.deployunit.metastore.EntrySubscriber;
 import org.apache.ignite.internal.deployunit.metastore.SortedListAccumulator;
 import org.apache.ignite.internal.deployunit.metastore.UnitStatusAccumulator;
 import org.apache.ignite.internal.deployunit.metastore.UnitsAccumulator;
-import org.apache.ignite.internal.future.InFlightFutures;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.dsl.Operation;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.ChannelType;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 
 /**
@@ -82,14 +56,10 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
 
     private static final IgniteLogger LOG = 
Loggers.forClass(DeploymentManagerImpl.class);
 
-    private static final String TMP_SUFFIX = ".tmp";
-
-    private static final ChannelType DEPLOYMENT_CHANNEL = 
ChannelType.register((short) 1, "DeploymentUnits");
-
     /**
-     * Meta storage.
+     * Node working directory.
      */
-    private final MetaStorageManager metaStorage;
+    private final Path workDir;
 
     /**
      * Deployment configuration.
@@ -101,20 +71,20 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
      */
     private final ClusterManagementGroupManager cmgManager;
 
-    /**
-     * In flight futures tracker.
-     */
-    private final InFlightFutures inFlightFutures = new InFlightFutures();
 
     /**
      * Cluster service.
      */
     private final ClusterService clusterService;
 
-    /**
-     * Folder for units.
-     */
-    private Path unitsFolder;
+
+    private final DeployMessagingService messaging;
+
+    private final FileDeployerService deployer;
+
+    private final DeployMetastoreService metastore;
+
+    private final DeployTracker tracker;
 
     /**
      * Constructor.
@@ -131,10 +101,13 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
             DeploymentConfiguration configuration,
             ClusterManagementGroupManager cmgManager) {
         this.clusterService = clusterService;
-        this.metaStorage = metaStorage;
         this.configuration = configuration;
         this.cmgManager = cmgManager;
-        unitsFolder = workDir;
+        this.workDir = workDir;
+        this.tracker = new DeployTracker();
+        metastore = new DeployMetastoreService(metaStorage);
+        deployer = new FileDeployerService();
+        messaging = new DeployMessagingService(clusterService, cmgManager, 
deployer, tracker);
     }
 
     @Override
@@ -143,67 +116,46 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
         Objects.requireNonNull(version);
         Objects.requireNonNull(deploymentUnit);
 
-        ByteArray key = key(id, version.render());
-        UnitMeta meta = new UnitMeta(id, version, deploymentUnit.name(), 
Collections.emptyList());
-
-        Operation put = put(key, UnitMetaSerializer.serialize(meta));
-
-        DeployUnitRequestBuilder builder = DeployUnitRequestImpl.builder();
+        UnitMeta meta = new UnitMeta(id, version, deploymentUnit.name(), 
DeploymentStatus.UPLOADING, Collections.emptyList());
 
+        byte[] unitContent;
         try {
-            builder.unitContent(deploymentUnit.content().readAllBytes());
+            unitContent = deploymentUnit.content().readAllBytes();
         } catch (IOException e) {
             LOG.error("Error to read deployment unit content", e);
             return CompletableFuture.failedFuture(new 
DeploymentUnitReadException(e));
         }
-        DeployUnitRequest request = builder
-                .unitName(deploymentUnit.name())
-                .id(id)
-                .version(version.render())
-                .build();
 
-        return metaStorage.invoke(notExists(key), put, Operations.noop())
+        CompletableFuture<Boolean> result = metastore.putIfNotExist(id, 
version, meta)
                 .thenCompose(success -> {
                     if (success) {
-                        return doDeploy(request);
+                        return deployer.deploy(id, version.render(), 
deploymentUnit.name(), unitContent);
                     }
                     LOG.error("Failed to deploy meta of unit " + id + ":" + 
version);
                     return CompletableFuture.failedFuture(
                             new DeploymentUnitAlreadyExistsException(id,
                                     "Unit " + id + ":" + version + " already 
exists"));
                 })
+                .thenCompose(deployed -> {
+                    if (deployed) {
+                        return metastore.updateMeta(id, version,
+                                meta1 -> 
meta1.addConsistentId(clusterService.topologyService().localMember().name()));
+                    }
+                    return CompletableFuture.completedFuture(false);
+                })
                 .thenApply(completed -> {
                     if (completed) {
-                        startDeployAsyncToCmg(request);
+                        messaging.startDeployAsyncToCmg(id, version, 
deploymentUnit.name(), unitContent)
+                                .thenAccept(ids -> metastore.updateMeta(id, 
version, unitMeta -> {
+                                    for (String consistentId : ids) {
+                                        unitMeta.addConsistentId(consistentId);
+                                    }
+                                    unitMeta.updateStatus(DEPLOYED);
+                                }));
                     }
                     return completed;
                 });
-    }
-
-    private void startDeployAsyncToCmg(DeployUnitRequest request) {
-        cmgManager.cmgNodes()
-                .thenAccept(nodes -> {
-                    for (String node : nodes) {
-                        ClusterNode clusterNode = 
clusterService.topologyService().getByConsistentId(node);
-                        if (clusterNode != null) {
-                            
inFlightFutures.registerFuture(requestDeploy(clusterNode, request));
-                        }
-                    }
-                });
-    }
-
-    private CompletableFuture<Boolean> requestDeploy(ClusterNode clusterNode, 
DeployUnitRequest request) {
-        return clusterService.messagingService()
-                .invoke(clusterNode, request, Long.MAX_VALUE)
-                .thenCompose(message -> {
-                    Throwable error = ((DeployUnitResponse) message).error();
-                    if (error != null) {
-                        LOG.error("Failed to deploy unit " + request.id() + 
":" + request.version()
-                                + " to node " + clusterNode, error);
-                        return CompletableFuture.failedFuture(error);
-                    }
-                    return CompletableFuture.completedFuture(true);
-                });
+        return tracker.track(id, version, result);
     }
 
     @Override
@@ -211,9 +163,15 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
         checkId(id);
         Objects.requireNonNull(version);
 
-        ByteArray key = key(id, version.render());
-
-        return metaStorage.invoke(exists(key), Operations.remove(key), 
Operations.noop())
+        return messaging.stopInProgressDeploy(id, version)
+                .thenCompose(v -> metastore.updateMeta(id, version, true, meta 
-> meta.updateStatus(OBSOLETE)))
+                .thenCompose(success -> {
+                    if (success) {
+                        //TODO: Check unit usages here. If unit used in 
compute task we cannot just remove it.
+                        return metastore.updateMeta(id, version, true, meta -> 
meta.updateStatus(REMOVING));
+                    }
+                    return CompletableFuture.completedFuture(false);
+                })
                 .thenCompose(success -> {
                     if (success) {
                         return cmgManager.logicalTopology();
@@ -221,22 +179,10 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
                     return CompletableFuture.failedFuture(new 
DeploymentUnitNotFoundException(
                             "Unit " + id + " with version " + version + " 
doesn't exist"));
                 }).thenApply(logicalTopologySnapshot -> {
-                    for (ClusterNode node : logicalTopologySnapshot.nodes()) {
-                        clusterService.messagingService()
-                                .invoke(node, DEPLOYMENT_CHANNEL,
-                                        UndeployUnitRequestImpl.builder()
-                                                .id(id)
-                                                .version(version.render())
-                                                .build(),
-                                        Long.MAX_VALUE)
-                                .thenAccept(message -> {
-                                    Throwable error = ((UndeployUnitResponse) 
message).error();
-                                    if (error != null) {
-                                        LOG.error("Failed to undeploy unit " + 
id + ":" + version
-                                                + " from node " + node, error);
-                                    }
-                                });
-                    }
+                    allOf(logicalTopologySnapshot.nodes().stream()
+                            .map(node -> messaging.undeploy(node, id, version))
+                            .toArray(CompletableFuture[]::new))
+                            .thenAccept(unused -> metastore.removeIfExist(id, 
version));
                     return null;
                 });
     }
@@ -244,7 +190,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
     @Override
     public CompletableFuture<List<UnitStatus>> unitsAsync() {
         CompletableFuture<List<UnitStatus>> result = new CompletableFuture<>();
-        metaStorage.prefix(allUnits())
+        metastore.getAll()
                 .subscribe(new EntrySubscriber<>(result, new 
UnitsAccumulator()));
         return result;
     }
@@ -253,7 +199,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
     public CompletableFuture<List<Version>> versionsAsync(String id) {
         checkId(id);
         CompletableFuture<List<Version>> result = new CompletableFuture<>();
-        metaStorage.prefix(withId(id))
+        metastore.getAllWithId(id)
                 .subscribe(
                         new EntrySubscriber<>(
                                 result,
@@ -267,7 +213,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
     public CompletableFuture<UnitStatus> statusAsync(String id) {
         checkId(id);
         CompletableFuture<UnitStatus> result = new CompletableFuture<>();
-        metaStorage.prefix(withId(id))
+        metastore.getAllWithId(id)
                 .subscribe(new EntrySubscriber<>(result, new 
UnitStatusAccumulator(id)));
         return result;
     }
@@ -277,7 +223,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
         Objects.requireNonNull(consistentId);
 
         CompletableFuture<List<UnitStatus>> result = new CompletableFuture<>();
-        metaStorage.prefix(allUnits())
+        metastore.getAll()
                 .subscribe(
                         new EntrySubscriber<>(
                                 result,
@@ -289,83 +235,13 @@ public class DeploymentManagerImpl implements 
IgniteDeployment, IgniteComponent
 
     @Override
     public void start() {
-        unitsFolder = 
unitsFolder.resolve(configuration.deploymentLocation().value());
-        
clusterService.messagingService().addMessageHandler(DeployUnitMessageTypes.class,
-                (message, senderConsistentId, correlationId) -> {
-                    if (message instanceof DeployUnitRequest) {
-                        processDeployRequest((DeployUnitRequest) message, 
senderConsistentId, correlationId);
-                    } else if (message instanceof UndeployUnitRequest) {
-                        processUndeployRequest((UndeployUnitRequest) message, 
senderConsistentId, correlationId);
-                    }
-                });
-    }
-
-    private void processDeployRequest(DeployUnitRequest executeRequest, String 
senderConsistentId, long correlationId) {
-        doDeploy(executeRequest).whenComplete((success, throwable) -> {
-            DeployUnitResponseBuilder builder = 
DeployUnitResponseImpl.builder();
-            if (throwable != null) {
-                builder.error(throwable);
-            }
-            clusterService.messagingService().respond(senderConsistentId, 
DEPLOYMENT_CHANNEL,
-                    builder.build(), correlationId);
-        });
-    }
-
-    private void processUndeployRequest(UndeployUnitRequest executeRequest, 
String senderConsistentId, long correlationId) {
-        try {
-            Path unitPath = unitsFolder
-                    .resolve(executeRequest.id())
-                    .resolve(executeRequest.version());
-
-            IgniteUtils.deleteIfExistsThrowable(unitPath);
-        } catch (IOException e) {
-            LOG.error("Failed to undeploy unit " + executeRequest.id() + ":" + 
executeRequest.version(), e);
-            clusterService.messagingService()
-                    .respond(senderConsistentId, DEPLOYMENT_CHANNEL, 
UndeployUnitResponseImpl.builder().error(e).build(), correlationId);
-            return;
-        }
-
-        clusterService.messagingService()
-                .respond(senderConsistentId, DEPLOYMENT_CHANNEL, 
UndeployUnitResponseImpl.builder().build(), correlationId);
-    }
-
-    private CompletableFuture<Boolean> doDeploy(DeployUnitRequest 
executeRequest) {
-        String id = executeRequest.id();
-        String version = executeRequest.version();
-        try {
-            Path unitPath = unitsFolder
-                    .resolve(executeRequest.id())
-                    .resolve(executeRequest.version())
-                    .resolve(executeRequest.unitName());
-
-            Path unitPathTmp = unitPath.resolveSibling(unitPath.getFileName() 
+ TMP_SUFFIX);
-
-            Files.createDirectories(unitPathTmp.getParent());
-
-            Files.write(unitPathTmp, executeRequest.unitContent(), CREATE, 
SYNC, TRUNCATE_EXISTING);
-            Files.move(unitPathTmp, unitPath, ATOMIC_MOVE, REPLACE_EXISTING);
-        } catch (IOException e) {
-            LOG.error("Failed to deploy unit " + executeRequest.id() + ":" + 
executeRequest.version(), e);
-            return CompletableFuture.failedFuture(e);
-        }
-
-        ByteArray key = key(id, version);
-        return metaStorage.get(key)
-                .thenCompose(e -> {
-                    UnitMeta prev = UnitMetaSerializer.deserialize(e.value());
-
-                    
prev.addConsistentId(clusterService.topologyService().localMember().name());
-
-                    return metaStorage.invoke(
-                            revision(key).eq(e.revision()),
-                            put(key, UnitMetaSerializer.serialize(prev)),
-                            Operations.noop());
-                });
+        
deployer.initUnitsFolder(workDir.resolve(configuration.deploymentLocation().value()));
+        messaging.subscribe();
     }
 
     @Override
     public void stop() throws Exception {
-        inFlightFutures.cancelInFlightFutures();
+        tracker.cancelAll();
     }
 
     private static void checkId(String id) {
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
new file mode 100644
index 0000000000..73715e879b
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.SYNC;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Service for file deploying on local File System.
+ */
+public class FileDeployerService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileDeployerService.class);
+
+    private static final String TMP_SUFFIX = ".tmp";
+
+    private static final int DEPLOYMENT_EXECUTOR_SIZE = 4;
+
+    /**
+     * Folder for units.
+     */
+    private Path unitsFolder;
+
+
+    private final ExecutorService executor = Executors.newFixedThreadPool(
+            DEPLOYMENT_EXECUTOR_SIZE, new NamedThreadFactory("deployment", 
LOG));
+
+    public void initUnitsFolder(Path unitsFolder) {
+        this.unitsFolder = unitsFolder;
+    }
+
+    /**
+     * Deploy provided unit on local fs.
+     *
+     * @param id Deploy unit identifier.
+     * @param version Deploy unit version.
+     * @param unitName Deploy unit file name.
+     * @param unitContent Deploy unit content.
+     * @return Future with deploy result.
+     */
+    public CompletableFuture<Boolean> deploy(String id, String version, String 
unitName, byte[] unitContent) {
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                Path unitPath = unitsFolder
+                        .resolve(id)
+                        .resolve(version)
+                        .resolve(unitName);
+
+                Path unitPathTmp = 
unitPath.resolveSibling(unitPath.getFileName() + TMP_SUFFIX);
+
+                Files.createDirectories(unitPathTmp.getParent());
+
+                Files.write(unitPathTmp, unitContent, CREATE, SYNC, 
TRUNCATE_EXISTING);
+                Files.move(unitPathTmp, unitPath, ATOMIC_MOVE, 
REPLACE_EXISTING);
+                return true;
+            } catch (IOException e) {
+                LOG.error("Failed to deploy unit " + id + ":" + version, e);
+                return false;
+            }
+        }, executor);
+    }
+
+    /**
+     * Undeploy unit with provided identifier and version.
+     *
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @return Future with undeploy result
+     */
+    public CompletableFuture<Boolean> undeploy(String id, String version) {
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                Path unitPath = unitsFolder
+                        .resolve(id)
+                        .resolve(version);
+
+                IgniteUtils.deleteIfExistsThrowable(unitPath);
+                return true;
+            } catch (IOException e) {
+                LOG.error("Failed to undeploy unit " + id + ":" + version, e);
+                return false;
+            }
+        }, executor);
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
index e8b119e593..29e5f0f8df 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.deployunit;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.ignite.deployment.DeploymentStatus;
 import org.apache.ignite.deployment.version.Version;
 
 /**
@@ -40,6 +41,11 @@ public class UnitMeta {
      */
     private final String name;
 
+    /**
+     * Deployment status.
+     */
+    private DeploymentStatus status;
+
     /**
      * Consistent ids of nodes with.
      */
@@ -53,10 +59,11 @@ public class UnitMeta {
      * @param name Unit name.
      * @param consistentIdLocation Consistent ids of nodes where unit deployed.
      */
-    public UnitMeta(String id, Version version, String name, List<String> 
consistentIdLocation) {
+    public UnitMeta(String id, Version version, String name, DeploymentStatus 
status, List<String> consistentIdLocation) {
         this.id = id;
         this.version = version;
         this.name = name;
+        this.status = status;
         this.consistentIdLocation.addAll(consistentIdLocation);
     }
 
@@ -105,6 +112,14 @@ public class UnitMeta {
         consistentIdLocation.add(id);
     }
 
+    public DeploymentStatus status() {
+        return status;
+    }
+
+    public void updateStatus(DeploymentStatus status) {
+        this.status = status;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -125,6 +140,9 @@ public class UnitMeta {
         if (name != null ? !name.equals(meta.name) : meta.name != null) {
             return false;
         }
+        if (status != meta.status) {
+            return false;
+        }
         return consistentIdLocation.equals(meta.consistentIdLocation);
     }
 
@@ -133,7 +151,19 @@ public class UnitMeta {
         int result = id != null ? id.hashCode() : 0;
         result = 31 * result + (version != null ? version.hashCode() : 0);
         result = 31 * result + (name != null ? name.hashCode() : 0);
+        result = 31 * result + (status != null ? status.hashCode() : 0);
         result = 31 * result + consistentIdLocation.hashCode();
         return result;
     }
+
+    @Override
+    public String toString() {
+        return "UnitMeta{"
+                + "id='" + id + '\''
+                + ", version=" + version
+                + ", name='" + name + '\''
+                + ", status=" + status
+                + ", consistentIdLocation=" + String.join(", ", 
consistentIdLocation)
+                + '}';
+    }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
index fe61f27d17..e79a5dbfc6 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.deployunit.key;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.Base64.Encoder;
+import org.apache.ignite.deployment.version.Version;
 import org.apache.ignite.lang.ByteArray;
 
 /**
@@ -61,14 +62,14 @@ public final class UnitKey {
      * @param version Required unit version.
      * @return Key in {@link ByteArray} format.
      */
-    public static ByteArray key(String id, String version) {
+    public static ByteArray key(String id, Version version) {
         StringBuilder sb = new StringBuilder(UNITS_PREFIX);
         Encoder encoder = Base64.getEncoder();
         if (id != null) {
             
sb.append(encoder.encodeToString(id.getBytes(StandardCharsets.UTF_8)));
             if (version != null) {
                 sb.append(":");
-                
sb.append(encoder.encodeToString(version.getBytes(StandardCharsets.UTF_8)));
+                
sb.append(encoder.encodeToString(version.render().getBytes(StandardCharsets.UTF_8)));
             }
         }
         return new ByteArray(sb.toString());
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
index 18f7e9b2f4..624bdd17ab 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
@@ -24,6 +24,7 @@ import java.util.Base64;
 import java.util.Base64.Decoder;
 import java.util.Base64.Encoder;
 import java.util.List;
+import org.apache.ignite.deployment.DeploymentStatus;
 import org.apache.ignite.deployment.version.Version;
 import org.apache.ignite.internal.deployunit.UnitMeta;
 
@@ -52,6 +53,7 @@ public final class UnitMetaSerializer {
         appendWithEncoding(sb, meta.id());
         appendWithEncoding(sb, meta.version().render());
         appendWithEncoding(sb, meta.name());
+        appendWithEncoding(sb, meta.status().name());
 
         for (String id : meta.consistentIdLocation()) {
             appendWithEncoding(sb, id);
@@ -81,11 +83,13 @@ public final class UnitMetaSerializer {
         String version = new String(decoder.decode(split[1]), UTF_8);
         String unitName = new String(decoder.decode(split[2]), UTF_8);
 
+        DeploymentStatus status = DeploymentStatus.valueOf(new 
String(decoder.decode(split[3]), UTF_8));
+
         List<String> ids = new ArrayList<>();
-        for (int i = 3; i < split.length; i++) {
+        for (int i = 4; i < split.length; i++) {
             ids.add(new String(decoder.decode(split[i]), UTF_8));
         }
 
-        return new UnitMeta(id, Version.parseVersion(version), unitName, ids);
+        return new UnitMeta(id, Version.parseVersion(version), unitName, 
status, ids);
     }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
index 54337f7fd5..19d5b8f5ce 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
@@ -44,4 +44,8 @@ public class DeployUnitMessageTypes {
      * Message type for {@link UndeployUnitResponse}.
      */
     public static final short UNDEPLOY_UNIT_RESPONSE = 3;
+
+    public static final short STOP_DEPLOY_REQUEST = 4;
+
+    public static final short STOP_DEPLOY_RESPONSE = 5;
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
index ee298c8a1d..084874304b 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.deployunit.message;
 
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -27,10 +26,9 @@ import org.apache.ignite.network.annotations.Transferable;
 @Transferable(DeployUnitMessageTypes.DEPLOY_UNIT_RESPONSE)
 public interface DeployUnitResponse extends NetworkMessage {
     /**
-     * Returns error which happens on deploy process.
+     * Shows success or not deploy process.
      *
-     * @return error which happens on deploy process.
+     * @return success or not deploy process.
      */
-    @Marshallable
-    Throwable error();
+    boolean success();
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployRequest.java
similarity index 72%
copy from 
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
copy to 
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployRequest.java
index ee298c8a1d..06bcf74f0e 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployRequest.java
@@ -18,19 +18,24 @@
 package org.apache.ignite.internal.deployunit.message;
 
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Deploy unit response.
+ * Stop deploy request.
  */
-@Transferable(DeployUnitMessageTypes.DEPLOY_UNIT_RESPONSE)
-public interface DeployUnitResponse extends NetworkMessage {
+@Transferable(DeployUnitMessageTypes.STOP_DEPLOY_REQUEST)
+public interface StopDeployRequest extends NetworkMessage {
     /**
-     * Returns error which happens on deploy process.
+     * Returns deployment unit identifier.
      *
-     * @return error which happens on deploy process.
+     * @return deployment unit identifier.
      */
-    @Marshallable
-    Throwable error();
+    String id();
+
+    /**
+     * Returns deployment unit version.
+     *
+     * @return deployment unit version.
+     */
+    String version();
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployResponse.java
similarity index 72%
copy from 
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
copy to 
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployResponse.java
index ee298c8a1d..84a96a251a 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployResponse.java
@@ -17,20 +17,14 @@
 
 package org.apache.ignite.internal.deployunit.message;
 
+import static 
org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes.STOP_DEPLOY_RESPONSE;
+
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Deploy unit response.
+ * Stop deploy request.
  */
-@Transferable(DeployUnitMessageTypes.DEPLOY_UNIT_RESPONSE)
-public interface DeployUnitResponse extends NetworkMessage {
-    /**
-     * Returns error which happens on deploy process.
-     *
-     * @return error which happens on deploy process.
-     */
-    @Marshallable
-    Throwable error();
+@Transferable(STOP_DEPLOY_RESPONSE)
+public interface StopDeployResponse extends NetworkMessage {
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
index e430724ac4..3eccbb243f 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.deployunit.message;
 
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -26,11 +25,4 @@ import org.apache.ignite.network.annotations.Transferable;
  */
 @Transferable(DeployUnitMessageTypes.UNDEPLOY_UNIT_RESPONSE)
 public interface UndeployUnitResponse extends NetworkMessage {
-    /**
-     * Returns error which happens on undeploy process.
-     *
-     * @return error which happens on undeploy process.
-     */
-    @Marshallable
-    Throwable error();
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ListAccumulator.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ListAccumulator.java
new file mode 100644
index 0000000000..225e0b5d71
--- /dev/null
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ListAccumulator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Plain list accumulator.
+ *
+ * @param <T> Result value type.
+ */
+public class ListAccumulator<T extends Comparable<T>> implements 
Accumulator<List<T>> {
+    private final Function<Entry, T> mapper;
+
+    private final List<T> result = new ArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param mapper Value mapper.
+     */
+    public ListAccumulator(Function<Entry, T> mapper) {
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void accumulate(Entry value) {
+        T apply = mapper.apply(value);
+        if (apply != null) {
+            result.add(apply);
+        }
+    }
+
+    @Override
+    public List<T> get() throws AccumulateException {
+        Collections.sort(result);
+        return result;
+    }
+}
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
index db4b2d4686..1a96868589 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.deployunit.metastore;
 
+import org.apache.ignite.deployment.DeploymentInfo;
 import org.apache.ignite.deployment.UnitStatus;
 import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
 import org.apache.ignite.internal.deployunit.UnitMeta;
@@ -46,8 +47,12 @@ public class UnitStatusAccumulator implements 
Accumulator<UnitStatus> {
         if (builder == null) {
             builder = UnitStatus.builder(id);
         }
-        UnitMeta deserialize = UnitMetaSerializer.deserialize(item.value());
-        builder.append(deserialize.version(), 
deserialize.consistentIdLocation());
+        UnitMeta meta = UnitMetaSerializer.deserialize(item.value());
+        builder.append(meta.version(),
+                DeploymentInfo.builder()
+                        .status(meta.status())
+                        .addConsistentIds(meta.consistentIdLocation()).build()
+        );
     }
 
     @Override
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
index e911c88981..6d1e91c555 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import org.apache.ignite.deployment.DeploymentInfo;
 import org.apache.ignite.deployment.UnitStatus;
 import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
 import org.apache.ignite.internal.deployunit.UnitMeta;
@@ -49,7 +50,11 @@ public class UnitsAccumulator implements 
Accumulator<List<UnitStatus>> {
         UnitMeta meta = UnitMetaSerializer.deserialize(item.value());
         if (filter.test(meta)) {
             map.computeIfAbsent(meta.id(), UnitStatus::builder)
-                    .append(meta.version(), meta.consistentIdLocation());
+                    .append(meta.version(),
+                            DeploymentInfo.builder()
+                                    .status(meta.status())
+                                    
.addConsistentIds(meta.consistentIdLocation()).build()
+                    );
         }
     }
 
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
index 695b187cdb..09501557a4 100644
--- 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.deployment;
 
+import static org.apache.ignite.deployment.DeploymentStatus.UPLOADING;
 import static 
org.apache.ignite.internal.deployunit.key.UnitMetaSerializer.deserialize;
 import static 
org.apache.ignite.internal.deployunit.key.UnitMetaSerializer.serialize;
 import static org.hamcrest.Matchers.is;
@@ -35,7 +36,7 @@ import org.junit.jupiter.api.Test;
 public class UnitMetaSerializerTest {
     @Test
     public void testSerializeDeserializeLatest() {
-        UnitMeta meta = new UnitMeta("id", Version.LATEST, "unitName", 
Arrays.asList("id1", "id2"));
+        UnitMeta meta = new UnitMeta("id", Version.LATEST, "unitName", 
UPLOADING, Arrays.asList("id1", "id2"));
 
         byte[] serialize = serialize(meta);
 
@@ -44,7 +45,7 @@ public class UnitMetaSerializerTest {
 
     @Test
     public void testSerializeDeserializeUnit() {
-        UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0.0"), 
"unitName", Arrays.asList("id1", "id2"));
+        UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0.0"), 
"unitName", UPLOADING, Arrays.asList("id1", "id2"));
 
         byte[] serialize = serialize(meta);
 
@@ -53,7 +54,7 @@ public class UnitMetaSerializerTest {
 
     @Test
     public void testSerializeDeserializeUnitIncompleteVersion() {
-        UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0"), 
"unitName", Arrays.asList("id1", "id2"));
+        UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0"), 
"unitName", UPLOADING, Arrays.asList("id1", "id2"));
 
         byte[] serialize = serialize(meta);
 
@@ -62,7 +63,7 @@ public class UnitMetaSerializerTest {
 
     @Test
     public void testSerializeDeserializeUnitEmptyConsistentId() {
-        UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0.0"), 
"unitName", Collections.emptyList());
+        UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0.0"), 
"unitName", UPLOADING, Collections.emptyList());
 
         byte[] serialize = serialize(meta);
 
@@ -72,7 +73,7 @@ public class UnitMetaSerializerTest {
 
     @Test
     public void testSerializeDeserializeWithSeparatorCharInIdName() {
-        UnitMeta meta = new UnitMeta("id;", Version.parseVersion("3.0.0"), 
"unitName;", Collections.emptyList());
+        UnitMeta meta = new UnitMeta("id;", Version.parseVersion("3.0.0"), 
"unitName;", UPLOADING, Collections.emptyList());
 
         byte[] serialize = serialize(meta);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
index 70174fa308..7a7bd215f0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
@@ -36,10 +36,11 @@ public class InFlightFutures implements 
Iterable<CompletableFuture<?>> {
      *
      * @param future the future to register
      */
-    public void registerFuture(CompletableFuture<?> future) {
+    public <T> CompletableFuture<T> registerFuture(CompletableFuture<T> 
future) {
         inFlightFutures.add(future);
 
         future.whenComplete((result, ex) -> inFlightFutures.remove(future));
+        return future;
     }
 
     /**
diff --git 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
index 19c382dc74..0178afef8a 100644
--- 
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
+++ 
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
@@ -46,7 +46,7 @@ public class DeploymentManagementController implements 
DeploymentCodeApi {
         try {
             DeploymentUnit deploymentUnit = toDeploymentUnit(unitContent);
             if (unitVersion == null || unitVersion.isBlank()) {
-                return deployment.deployAsync(unitId, deploymentUnit);
+                return deployment.deployAsync(unitId, Version.LATEST, 
deploymentUnit);
             }
             return deployment.deployAsync(unitId, 
Version.parseVersion(unitVersion), deploymentUnit);
         } catch (IOException e) {
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index dc58e3b731..26ea20da90 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -133,6 +133,7 @@ dependencies {
     integrationTestImplementation(testFixtures(project(':ignite-storage-api')))
     
integrationTestImplementation(testFixtures(project(':ignite-transactions')))
     integrationTestImplementation libs.jetbrains.annotations
+    integrationTestImplementation libs.awaitility
     integrationTestImplementation libs.rocksdb.jni
     integrationTestImplementation libs.disruptor
     integrationTestImplementation libs.jackson.databind
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index 8aaf283ac6..a3fb8e1db9 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -19,7 +19,15 @@ package org.apache.ignite.internal.deployment;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.WRITE;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.deployment.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.deployment.DeploymentStatus.REMOVING;
+import static org.apache.ignite.deployment.DeploymentStatus.UPLOADING;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -34,7 +42,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import org.apache.ignite.deployment.DeploymentInfo;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.deployment.UnitStatus;
 import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
@@ -42,7 +50,7 @@ import org.apache.ignite.deployment.version.Version;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -50,81 +58,95 @@ import org.junit.jupiter.api.Test;
  * Integration tests for {@link org.apache.ignite.deployment.IgniteDeployment}.
  */
 public class ItDeploymentUnitTest extends ClusterPerTestIntegrationTest {
-    private static final long REPLICA_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
-    private static final long SIZE_IN_BYTES = 1024L;
+    private static final int BASE_REPLICA_TIMEOUT = 30;
 
-    private Path dummyFile;
+    private static final long SMALL_IN_BYTES = 1024L;
+
+    private static final long MEDIUM_IN_BYTES = 1024L * 1024L;
+
+    private static final long BIG_IN_BYTES = 1024L * 1024L * 1024L;
+
+    private DeployFile smallFile;
+
+    private DeployFile mediumFile;
+
+    private DeployFile bigFile;
 
     @BeforeEach
     public void generateDummy() throws IOException {
-        dummyFile = workDir.resolve("dummy.txt");
-
-        if (!Files.exists(dummyFile)) {
-            try (SeekableByteChannel channel = Files.newByteChannel(dummyFile, 
WRITE, CREATE)) {
-                channel.position(SIZE_IN_BYTES - 4);
+        smallFile = create("small.txt", SMALL_IN_BYTES, BASE_REPLICA_TIMEOUT);
+        mediumFile = create("medium.txt", MEDIUM_IN_BYTES, 
BASE_REPLICA_TIMEOUT * 2);
+        bigFile = create("big.txt", BIG_IN_BYTES, BASE_REPLICA_TIMEOUT * 3);
+    }
 
-                ByteBuffer buf = ByteBuffer.allocate(4).putInt(2);
-                buf.rewind();
-                channel.write(buf);
-            }
-        }
+    private DeployFile create(String name, long size, int replicaTimeout) 
throws IOException {
+        DeployFile deployFile = new DeployFile(workDir.resolve(name), size, 
replicaTimeout);
+        deployFile.ensureExist();
+        return deployFile;
     }
 
     @Test
-    public void testDeploy() throws Exception {
-        Unit unit = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
+    public void testDeploy() {
+        String id = "test";
+        Unit unit = deployAndVerifySmall(id, Version.parseVersion("1.1.0"), 1);
 
         IgniteImpl cmg = cluster.node(0);
         waitUnitReplica(cmg, unit);
 
-        CompletableFuture<List<UnitStatus>> list = 
node(2).deployment().unitsAsync();
-        UnitStatusBuilder builder = 
UnitStatus.builder(unit.id).append(unit.version, 
List.of(unit.deployedNode.name(), cmg.name()));
-        assertThat(list, willBe(List.of(builder.build())));
+        UnitStatus status = buildStatus(id, unit);
+
+        await().timeout(2, SECONDS)
+                .pollDelay(500, MILLISECONDS)
+                .until(() -> node(2).deployment().unitsAsync(), 
willBe(Collections.singletonList(status)));
     }
 
     @Test
-    public void testDeployUndeploy() throws Exception {
-        Unit unit = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
+    public void testDeployUndeploy() {
+        Unit unit = deployAndVerifySmall("test", 
Version.parseVersion("1.1.0"), 1);
         unit.undeploy();
 
+        IgniteImpl cmg = cluster.node(0);
+        waitUnitClean(cmg, unit);
+
         CompletableFuture<List<UnitStatus>> list = 
node(2).deployment().unitsAsync();
         assertThat(list, willBe(Collections.emptyList()));
     }
 
     @Test
-    public void testDeployTwoUnits() throws Exception {
+    public void testDeployTwoUnits() {
         String id = "test";
-        Unit unit1 = deployAndVerify(id, Version.parseVersion("1.1.0"), 1);
-        Unit unit2 = deployAndVerify(id, Version.parseVersion("1.1.1"), 2);
+        Unit unit1 = deployAndVerifySmall(id, Version.parseVersion("1.1.0"), 
1);
+        Unit unit2 = deployAndVerifySmall(id, Version.parseVersion("1.1.1"), 
2);
 
         IgniteImpl cmg = cluster.node(0);
         waitUnitReplica(cmg, unit1);
         waitUnitReplica(cmg, unit2);
 
-        CompletableFuture<UnitStatus> list = 
node(2).deployment().statusAsync(id);
-        UnitStatusBuilder status = UnitStatus.builder(id)
-                .append(unit1.version, List.of(unit1.deployedNode.name(), 
cmg.name()))
-                .append(unit2.version, List.of(unit2.deployedNode.name(), 
cmg.name()));
-        assertThat(list, willBe(status.build()));
+        UnitStatus status = buildStatus(id, unit1, unit2);
+
+        await().timeout(2, SECONDS)
+                .pollDelay(100, MILLISECONDS)
+                .until(() -> node(2).deployment().statusAsync(id), 
willBe(status));
 
         CompletableFuture<List<Version>> versions = 
node(2).deployment().versionsAsync(unit1.id);
         assertThat(versions, willBe(List.of(unit1.version, unit2.version)));
     }
 
     @Test
-    public void testDeployTwoUnitsAndUndeployOne() throws Exception {
-        Unit unit1 = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
-        Unit unit2 = deployAndVerify("test", Version.parseVersion("1.1.1"), 2);
+    public void testDeployTwoUnitsAndUndeployOne() {
+        String id = "test";
+        Unit unit1 = deployAndVerifySmall(id, Version.parseVersion("1.1.0"), 
1);
+        Unit unit2 = deployAndVerifySmall(id, Version.parseVersion("1.1.1"), 
2);
 
         IgniteImpl cmg = cluster.node(0);
         waitUnitReplica(cmg, unit1);
         waitUnitReplica(cmg, unit2);
 
-        CompletableFuture<UnitStatus> list = 
node(2).deployment().statusAsync(unit2.id);
-        UnitStatusBuilder builder = UnitStatus.builder(unit1.id)
-                .append(unit1.version, List.of(unit1.deployedNode.name(), 
cmg.name()))
-                .append(unit2.version, List.of(unit2.deployedNode.name(), 
cmg.name()));
-        assertThat(list, willBe(builder.build()));
+        UnitStatus status = buildStatus(id, unit1, unit2);
+
+        await().timeout(2, SECONDS)
+                .pollDelay(500, MILLISECONDS)
+                .until(() -> node(2).deployment().statusAsync(id), 
willBe(status));
 
         unit2.undeploy();
         CompletableFuture<List<Version>> newVersions = 
node(2).deployment().versionsAsync(unit1.id);
@@ -132,77 +154,149 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
-    public void testFindByConsistentId() throws InterruptedException {
+    public void testDeploymentStatus() {
+        String id = "test";
+        Version version = Version.parseVersion("1.1.0");
+        Unit unit = deployAndVerifyMedium(id, version, 1);
+
+        CompletableFuture<UnitStatus> status = 
node(2).deployment().statusAsync(id);
+        assertThat(status.thenApply(status1 -> status1.status(version)), 
willBe(UPLOADING));
+
+        IgniteImpl cmg = cluster.node(0);
+        waitUnitReplica(cmg, unit);
+
+        await().timeout(2, SECONDS)
+                .pollDelay(300, MILLISECONDS)
+                .until(() -> node(2).deployment().statusAsync(id)
+                        .thenApply(status1 -> status1.status(version)), 
willBe(DEPLOYED));
+
+        assertThat(unit.undeployAsync(), willSucceedFast());
+
+        assertThat(node(1).deployment().statusAsync(id)
+                .thenApply(status1 -> status1.status(version)), 
willBe(REMOVING));
+
+        waitUnitClean(unit.deployedNode, unit);
+        waitUnitClean(cmg, unit);
+
+        assertThat(node(2).deployment().statusAsync(id)
+                .thenApply(status1 -> status1.status(version)), 
willThrowFast(DeploymentUnitNotFoundException.class));
+    }
+
+    @Test
+    public void testFindByConsistentId() {
         String id = "test";
         String version = "1.1.0";
-        Unit unit = deployAndVerify(id, Version.parseVersion(version), 1);
+        Unit unit = deployAndVerify(id, Version.parseVersion(version), 
mediumFile, 1);
 
         IgniteImpl cmg = cluster.node(0);
         waitUnitReplica(cmg, unit);
 
-        IgniteImpl node = node(1);
+        IgniteImpl node = unit.deployedNode;
+
         CompletableFuture<List<UnitStatus>> nodes = 
node.deployment().findUnitByConsistentIdAsync(node.name());
         assertThat(nodes, willBe(Collections.singletonList(
                         UnitStatus.builder(id)
-                                .append(unit.version, List.of(node.name(), 
cmg.name()))
-                                .build())
+                                .append(unit.version,
+                                        DeploymentInfo.builder()
+                                                .status(DEPLOYED)
+                                                
.addConsistentId(unit.deployedNode.name())
+                                                .addConsistentId(cmg.name())
+                                                .build()
+                                ).build()
+                        )
                 )
         );
 
-        nodes = node.deployment().findUnitByConsistentIdAsync(node.name());
+        nodes = node.deployment().findUnitByConsistentIdAsync(cmg.name());
         assertThat(nodes, willBe(Collections.singletonList(
                         UnitStatus.builder(id)
-                                .append(unit.version, List.of(node.name(), 
cmg.name()))
-                                .build())
+                                .append(unit.version,
+                                        DeploymentInfo.builder()
+                                                .status(DEPLOYED)
+                                                
.addConsistentId(unit.deployedNode.name())
+                                                .addConsistentId(cmg.name())
+                                                .build()
+                                ).build()
                 )
-        );
+        ));
 
         nodes = 
node.deployment().findUnitByConsistentIdAsync("not-existed-node");
         assertThat(nodes, willBe(Collections.emptyList()));
     }
 
-    private Unit deployAndVerify(String id, Version version, int nodeIndex) {
+    private UnitStatus buildStatus(String id, Unit... units) {
+        IgniteImpl cmg = cluster.node(0);
+
+        UnitStatusBuilder builder = UnitStatus.builder(id);
+        for (Unit unit : units) {
+            builder.append(unit.version,
+                    DeploymentInfo.builder()
+                            .status(DEPLOYED)
+                            .addConsistentId(unit.deployedNode.name())
+                            .addConsistentId(cmg.name())
+                            .build()
+            );
+        }
+
+        return builder.build();
+    }
+
+    private Unit deployAndVerify(String id, Version version, DeployFile file, 
int nodeIndex) {
         IgniteImpl entryNode = node(nodeIndex);
 
         CompletableFuture<Boolean> deploy = entryNode.deployment()
-                .deployAsync(id, version, fromPath(dummyFile));
+                .deployAsync(id, version, fromPath(file.file));
 
         assertThat(deploy, willBe(true));
 
-        Unit unit = new Unit(entryNode, id, version);
-        assertTrue(Files.exists(getNodeUnitFile(unit)));
+        Unit unit = new Unit(entryNode, id, version, file);
+        Path nodeUnitFile = getNodeUnitFile(unit);
+        assertTrue(Files.exists(nodeUnitFile));
 
         return unit;
     }
 
+    private Unit deployAndVerifySmall(String id, Version version, int 
nodeIndex) {
+        return deployAndVerify(id, version, smallFile, nodeIndex);
+    }
+
+    private Unit deployAndVerifyMedium(String id, Version version, int 
nodeIndex) {
+        return deployAndVerify(id, version, mediumFile, nodeIndex);
+    }
+
+    private Unit deployAndVerifyBig(String id, Version version, int nodeIndex) 
{
+        return deployAndVerify(id, version, bigFile, nodeIndex);
+    }
+
     private Path getNodeUnitFile(Unit unit) {
-        return getNodeUnitFile(unit.deployedNode, unit.id, unit.version);
+        return getNodeUnitFile(unit.deployedNode, unit.id, unit.version, 
unit.file);
     }
 
-    private Path getNodeUnitFile(IgniteImpl node, String unitId, Version 
unitVersion) {
+    private Path getNodeUnitFile(IgniteImpl node, String unitId, Version 
unitVersion, DeployFile file) {
         String deploymentFolder = node.nodeConfiguration()
                 .getConfiguration(DeploymentConfiguration.KEY)
                 .deploymentLocation().value();
         Path resolve = workDir.resolve(node.name()).resolve(deploymentFolder);
         return resolve.resolve(unitId)
                 .resolve(unitVersion.render())
-                .resolve(dummyFile.getFileName());
+                .resolve(file.file.getFileName());
     }
 
-    private void waitUnitReplica(IgniteImpl ignite, Unit unit) throws 
InterruptedException {
-        Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version);
-        assertTrue(IgniteTestUtils.waitForCondition(() -> {
-            try {
-                return Files.exists(unitPath) && Files.size(unitPath) == 
SIZE_IN_BYTES;
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }, REPLICA_TIMEOUT));
+    private void waitUnitReplica(IgniteImpl ignite, Unit unit) {
+        Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version, 
unit.file);
+
+        await().timeout(unit.file.replicaTimeout, SECONDS)
+                .pollDelay(1, SECONDS)
+                .ignoreException(IOException.class)
+                .until(() -> Files.exists(unitPath) && Files.size(unitPath) == 
unit.file.expectedSize);
     }
 
-    private void waitUnitClean(IgniteImpl ignite, Unit unit) throws 
InterruptedException {
-        Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version);
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
!Files.exists(unitPath), REPLICA_TIMEOUT));
+    private void waitUnitClean(IgniteImpl ignite, Unit unit) {
+        Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version, 
unit.file);
+
+        await().timeout(unit.file.replicaTimeout, SECONDS)
+                .pollDelay(2, SECONDS)
+                .until(() -> !Files.exists(unitPath));
     }
 
     class Unit {
@@ -212,19 +306,56 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
 
         private final Version version;
 
-        Unit(IgniteImpl deployedNode, String id, Version version) {
+        private final DeployFile file;
+
+        Unit(IgniteImpl deployedNode, String id, Version version, DeployFile 
file) {
             this.deployedNode = deployedNode;
             this.id = id;
             this.version = version;
+            this.file = file;
         }
 
-        void undeploy() throws InterruptedException {
+        CompletableFuture<Void> undeployAsync() {
+            return deployedNode.deployment().undeployAsync(id, version);
+        }
+
+        void undeploy() {
             deployedNode.deployment().undeployAsync(id, version);
             waitUnitClean(deployedNode, this);
         }
     }
 
-    static DeploymentUnit fromPath(Path path) {
+    private static class DeployFile {
+        private final Path file;
+
+        private final long expectedSize;
+
+        private final int replicaTimeout;
+
+        private DeployFile(Path file, long expectedSize, int replicaTimeout) {
+            this.file = file;
+            this.expectedSize = expectedSize;
+            this.replicaTimeout = replicaTimeout;
+        }
+
+        public void ensureExist() throws IOException {
+            ensureFile(file, expectedSize);
+        }
+
+        private static void ensureFile(Path path, long size) throws 
IOException {
+            if (!Files.exists(path)) {
+                try (SeekableByteChannel channel = Files.newByteChannel(path, 
WRITE, CREATE)) {
+                    channel.position(size - 4);
+
+                    ByteBuffer buf = ByteBuffer.allocate(4).putInt(2);
+                    buf.rewind();
+                    channel.write(buf);
+                }
+            }
+        }
+    }
+
+    private static DeploymentUnit fromPath(Path path) {
         Objects.requireNonNull(path);
         return new DeploymentUnit() {
 

Reply via email to