This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 94f3f5f331 IGNITE-19024 Atomic redeploy (#1865)
94f3f5f331 is described below

commit 94f3f5f33199d28b9ad2f670f4e02d08899a89c8
Author: Mikhail <[email protected]>
AuthorDate: Mon Apr 3 13:39:22 2023 +0300

    IGNITE-19024 Atomic redeploy (#1865)
    
    ---------
    
    Co-authored-by: Mikhail Pochatkin <[email protected]>
    Co-authored-by: Pavel Tupitsyn <[email protected]>
---
 .../deployunit/DeployMessagingService.java         | 16 +++--
 .../internal/deployunit/DeploymentManagerImpl.java | 83 +++++++++++++---------
 .../internal/deployunit/FileDeployerService.java   |  2 +-
 .../internal/deployunit/IgniteDeployment.java      | 16 ++++-
 .../internal/deployment/ItDeploymentUnitTest.java  | 34 ++++++---
 5 files changed, 100 insertions(+), 51 deletions(-)

diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
index efb6f76e21..7461d2772c 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
@@ -213,13 +213,15 @@ public class DeployMessagingService {
     private void processDeployRequest(DeployUnitRequest executeRequest, String 
senderConsistentId, long correlationId) {
         String id = executeRequest.id();
         String version = executeRequest.version();
-        deployerService.deploy(id, version, executeRequest.unitName(), 
executeRequest.unitContent())
-                .thenAccept(success -> 
clusterService.messagingService().respond(
-                        senderConsistentId,
-                        DEPLOYMENT_CHANNEL,
-                        
DeployUnitResponseImpl.builder().success(success).build(),
-                        correlationId)
-                );
+        tracker.track(id, Version.parseVersion(version),
+                deployerService.deploy(id, version, executeRequest.unitName(), 
executeRequest.unitContent())
+                        .thenCompose(success -> 
clusterService.messagingService().respond(
+                                senderConsistentId,
+                                DEPLOYMENT_CHANNEL,
+                                
DeployUnitResponseImpl.builder().success(success).build(),
+                                correlationId)
+                        )
+        );
     }
 
     private void processUndeployRequest(UndeployUnitRequest executeRequest, 
String senderConsistentId, long correlationId) {
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index d071be1008..405691f0fc 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -66,19 +66,29 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
      */
     private final ClusterManagementGroupManager cmgManager;
 
-
     /**
      * Cluster service.
      */
     private final ClusterService clusterService;
 
-
+    /**
+     * Deploy messaging service.
+     */
     private final DeployMessagingService messaging;
 
+    /**
+     * File deployer.
+     */
     private final FileDeployerService deployer;
 
+    /**
+     * Deployment units metastore service.
+     */
     private final DeployMetastoreService metastore;
 
+    /**
+     * Deploy tracker.
+     */
     private final DeployTracker tracker;
 
     /**
@@ -106,7 +116,7 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
     }
 
     @Override
-    public CompletableFuture<Boolean> deployAsync(String id, Version version, 
DeploymentUnit deploymentUnit) {
+    public CompletableFuture<Boolean> deployAsync(String id, Version version, 
boolean force, DeploymentUnit deploymentUnit) {
         checkId(id);
         Objects.requireNonNull(version);
         Objects.requireNonNull(deploymentUnit);
@@ -121,36 +131,43 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
             return CompletableFuture.failedFuture(new 
DeploymentUnitReadException(e));
         }
 
-        CompletableFuture<Boolean> result = metastore.putIfNotExist(id, 
version, meta)
+        return metastore.putIfNotExist(id, version, meta)
                 .thenCompose(success -> {
                     if (success) {
-                        return deployer.deploy(id, version.render(), 
deploymentUnit.name(), unitContent);
-                    }
-                    LOG.error("Failed to deploy meta of unit " + id + ":" + 
version);
-                    return CompletableFuture.failedFuture(
-                            new DeploymentUnitAlreadyExistsException(id,
-                                    "Unit " + id + ":" + version + " already 
exists"));
-                })
-                .thenCompose(deployed -> {
-                    if (deployed) {
-                        return metastore.updateMeta(id, version,
-                                meta1 -> 
meta1.addConsistentId(clusterService.topologyService().localMember().name()));
-                    }
-                    return CompletableFuture.completedFuture(false);
-                })
-                .thenApply(completed -> {
-                    if (completed) {
-                        messaging.startDeployAsyncToCmg(id, version, 
deploymentUnit.name(), unitContent)
-                                .thenAccept(ids -> metastore.updateMeta(id, 
version, unitMeta -> {
-                                    for (String consistentId : ids) {
-                                        unitMeta.addConsistentId(consistentId);
+                        return tracker.track(id, version, deployer.deploy(id, 
version.render(), deploymentUnit.name(), unitContent)
+                                .thenCompose(deployed -> {
+                                    if (deployed) {
+                                        return metastore.updateMeta(id, 
version,
+                                                unitMeta -> unitMeta
+                                                        
.addConsistentId(clusterService.topologyService().localMember().name()));
+                                    }
+                                    return 
CompletableFuture.completedFuture(false);
+                                })
+                                .thenApply(completed -> {
+                                    if (completed) {
+                                        messaging.startDeployAsyncToCmg(id, 
version, deploymentUnit.name(), unitContent)
+                                                .thenAccept(ids -> 
metastore.updateMeta(id, version, unitMeta -> {
+                                                    for (String consistentId : 
ids) {
+                                                        
unitMeta.addConsistentId(consistentId);
+                                                    }
+                                                    
unitMeta.updateStatus(DEPLOYED);
+                                                }));
                                     }
-                                    unitMeta.updateStatus(DEPLOYED);
+                                    return completed;
                                 }));
+                    } else {
+                        if (force) {
+                            return undeployAsync(id, version)
+                                    .thenCompose(v -> deployAsync(id, version, 
deploymentUnit));
+                        }
+                        LOG.warn("Failed to deploy meta of unit " + id + ":" + 
version + " to metastore. "
+                                + "Already exists.");
+                        return CompletableFuture.failedFuture(
+                                new DeploymentUnitAlreadyExistsException(id,
+                                        "Unit " + id + ":" + version + " 
already exists"));
                     }
-                    return completed;
+
                 });
-        return tracker.track(id, version, result);
     }
 
     @Override
@@ -173,13 +190,11 @@ public class DeploymentManagerImpl implements 
IgniteDeployment {
                     }
                     return CompletableFuture.failedFuture(new 
DeploymentUnitNotFoundException(
                             "Unit " + id + " with version " + version + " 
doesn't exist"));
-                }).thenApply(logicalTopologySnapshot -> {
-                    allOf(logicalTopologySnapshot.nodes().stream()
-                            .map(node -> messaging.undeploy(node, id, version))
-                            .toArray(CompletableFuture[]::new))
-                            .thenAccept(unused -> metastore.removeIfExist(id, 
version));
-                    return null;
-                });
+                }).thenCompose(logicalTopologySnapshot -> allOf(
+                        logicalTopologySnapshot.nodes().stream()
+                                .map(node -> messaging.undeploy(node, id, 
version))
+                                .toArray(CompletableFuture[]::new))
+                        .thenAccept(unused -> metastore.removeIfExist(id, 
version)));
     }
 
     @Override
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
index 73715e879b..fe4c7dfab3 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
@@ -105,7 +105,7 @@ public class FileDeployerService {
                 IgniteUtils.deleteIfExistsThrowable(unitPath);
                 return true;
             } catch (IOException e) {
-                LOG.error("Failed to undeploy unit " + id + ":" + version, e);
+                LOG.debug("Failed to undeploy unit " + id + ":" + version, e);
                 return false;
             }
         }, executor);
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
index f59478955a..48d1d3f14b 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
@@ -35,7 +35,21 @@ public interface IgniteDeployment extends IgniteComponent {
      * @param deploymentUnit Unit content.
      * @return Future with success or not result.
      */
-    CompletableFuture<Boolean> deployAsync(String id, Version version, 
DeploymentUnit deploymentUnit);
+    default CompletableFuture<Boolean> deployAsync(String id, Version version, 
DeploymentUnit deploymentUnit) {
+        return deployAsync(id, version, false, deploymentUnit);
+    }
+
+    /**
+     * Deploy provided unit to current node.
+     * After deploy finished, this deployment unit will be place to CMG group 
asynchronously.
+     *
+     * @param id Unit identifier. Not empty and not null.
+     * @param version Unit version.
+     * @param force Force redeploy if unit with provided id and version exists.
+     * @param deploymentUnit Unit content.
+     * @return Future with success or not result.
+     */
+    CompletableFuture<Boolean> deployAsync(String id, Version version, boolean 
force, DeploymentUnit deploymentUnit);
 
     /**
      * Undeploy latest version of unit with corresponding identifier.
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index c1a617592b..1ad7c19411 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -22,7 +22,6 @@ import static java.nio.file.StandardOpenOption.WRITE;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
-import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -104,9 +103,11 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     @Test
     public void testDeployUndeploy() {
         Unit unit = deployAndVerifySmall("test", 
Version.parseVersion("1.1.0"), 1);
-        unit.undeploy();
 
         IgniteImpl cmg = cluster.node(0);
+        waitUnitReplica(cmg, unit);
+
+        unit.undeploy();
         waitUnitClean(cmg, unit);
 
         CompletableFuture<List<UnitStatus>> list = 
node(2).deployment().unitsAsync();
@@ -173,9 +174,6 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
 
         assertThat(unit.undeployAsync(), willSucceedFast());
 
-        assertThat(node(1).deployment().statusAsync(id)
-                .thenApply(status1 -> status1.status(version)), 
willBe(REMOVING));
-
         waitUnitClean(unit.deployedNode, unit);
         waitUnitClean(cmg, unit);
 
@@ -187,7 +185,7 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     public void testFindByConsistentId() {
         String id = "test";
         String version = "1.1.0";
-        Unit unit = deployAndVerify(id, Version.parseVersion(version), 
mediumFile, 1);
+        Unit unit = deployAndVerifyMedium(id, Version.parseVersion(version), 
1);
 
         IgniteImpl cmg = cluster.node(0);
         waitUnitReplica(cmg, unit);
@@ -225,6 +223,22 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
         assertThat(nodes, willBe(Collections.emptyList()));
     }
 
+    @Test
+    public void testRedeploy() {
+        String id = "test";
+        String version = "1.1.0";
+        Unit smallUnit = deployAndVerify(id, Version.parseVersion(version), 
smallFile, 1);
+
+        IgniteImpl cmg = cluster.node(0);
+        waitUnitReplica(cmg, smallUnit);
+
+        Unit mediumUnit = deployAndVerify(id, Version.parseVersion(version), 
true, mediumFile, 1);
+        waitUnitReplica(cmg, mediumUnit);
+
+        waitUnitClean(smallUnit.deployedNode, smallUnit);
+        waitUnitClean(cmg, smallUnit);
+    }
+
     private UnitStatus buildStatus(String id, Unit... units) {
         IgniteImpl cmg = cluster.node(0);
 
@@ -243,10 +257,14 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     }
 
     private Unit deployAndVerify(String id, Version version, DeployFile file, 
int nodeIndex) {
+        return deployAndVerify(id, version, false, file, nodeIndex);
+    }
+
+    private Unit deployAndVerify(String id, Version version, boolean force, 
DeployFile file, int nodeIndex) {
         IgniteImpl entryNode = node(nodeIndex);
 
         CompletableFuture<Boolean> deploy = entryNode.deployment()
-                .deployAsync(id, version, fromPath(file.file));
+                .deployAsync(id, version, force, fromPath(file.file));
 
         assertThat(deploy, willBe(true));
 
@@ -258,7 +276,7 @@ public class ItDeploymentUnitTest extends 
ClusterPerTestIntegrationTest {
     }
 
     private Unit deployAndVerifySmall(String id, Version version, int 
nodeIndex) {
-        return deployAndVerify(id, version, smallFile, nodeIndex);
+        return deployAndVerifyMedium(id, version, nodeIndex);
     }
 
     private Unit deployAndVerifyMedium(String id, Version version, int 
nodeIndex) {

Reply via email to