This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 94f3f5f331 IGNITE-19024 Atomic redeploy (#1865)
94f3f5f331 is described below
commit 94f3f5f33199d28b9ad2f670f4e02d08899a89c8
Author: Mikhail <[email protected]>
AuthorDate: Mon Apr 3 13:39:22 2023 +0300
IGNITE-19024 Atomic redeploy (#1865)
---------
Co-authored-by: Mikhail Pochatkin <[email protected]>
Co-authored-by: Pavel Tupitsyn <[email protected]>
---
.../deployunit/DeployMessagingService.java | 16 +++--
.../internal/deployunit/DeploymentManagerImpl.java | 83 +++++++++++++---------
.../internal/deployunit/FileDeployerService.java | 2 +-
.../internal/deployunit/IgniteDeployment.java | 16 ++++-
.../internal/deployment/ItDeploymentUnitTest.java | 34 ++++++---
5 files changed, 100 insertions(+), 51 deletions(-)
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
index efb6f76e21..7461d2772c 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
@@ -213,13 +213,15 @@ public class DeployMessagingService {
private void processDeployRequest(DeployUnitRequest executeRequest, String
senderConsistentId, long correlationId) {
String id = executeRequest.id();
String version = executeRequest.version();
- deployerService.deploy(id, version, executeRequest.unitName(),
executeRequest.unitContent())
- .thenAccept(success ->
clusterService.messagingService().respond(
- senderConsistentId,
- DEPLOYMENT_CHANNEL,
-
DeployUnitResponseImpl.builder().success(success).build(),
- correlationId)
- );
+ tracker.track(id, Version.parseVersion(version),
+ deployerService.deploy(id, version, executeRequest.unitName(),
executeRequest.unitContent())
+ .thenCompose(success ->
clusterService.messagingService().respond(
+ senderConsistentId,
+ DEPLOYMENT_CHANNEL,
+
DeployUnitResponseImpl.builder().success(success).build(),
+ correlationId)
+ )
+ );
}
private void processUndeployRequest(UndeployUnitRequest executeRequest,
String senderConsistentId, long correlationId) {
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index d071be1008..405691f0fc 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -66,19 +66,29 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
*/
private final ClusterManagementGroupManager cmgManager;
-
/**
* Cluster service.
*/
private final ClusterService clusterService;
-
+ /**
+ * Deploy messaging service.
+ */
private final DeployMessagingService messaging;
+ /**
+ * File deployer.
+ */
private final FileDeployerService deployer;
+ /**
+ * Deployment units metastore service.
+ */
private final DeployMetastoreService metastore;
+ /**
+ * Deploy tracker.
+ */
private final DeployTracker tracker;
/**
@@ -106,7 +116,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
}
@Override
- public CompletableFuture<Boolean> deployAsync(String id, Version version,
DeploymentUnit deploymentUnit) {
+ public CompletableFuture<Boolean> deployAsync(String id, Version version,
boolean force, DeploymentUnit deploymentUnit) {
checkId(id);
Objects.requireNonNull(version);
Objects.requireNonNull(deploymentUnit);
@@ -121,36 +131,43 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
return CompletableFuture.failedFuture(new
DeploymentUnitReadException(e));
}
- CompletableFuture<Boolean> result = metastore.putIfNotExist(id,
version, meta)
+ return metastore.putIfNotExist(id, version, meta)
.thenCompose(success -> {
if (success) {
- return deployer.deploy(id, version.render(),
deploymentUnit.name(), unitContent);
- }
- LOG.error("Failed to deploy meta of unit " + id + ":" +
version);
- return CompletableFuture.failedFuture(
- new DeploymentUnitAlreadyExistsException(id,
- "Unit " + id + ":" + version + " already
exists"));
- })
- .thenCompose(deployed -> {
- if (deployed) {
- return metastore.updateMeta(id, version,
- meta1 ->
meta1.addConsistentId(clusterService.topologyService().localMember().name()));
- }
- return CompletableFuture.completedFuture(false);
- })
- .thenApply(completed -> {
- if (completed) {
- messaging.startDeployAsyncToCmg(id, version,
deploymentUnit.name(), unitContent)
- .thenAccept(ids -> metastore.updateMeta(id,
version, unitMeta -> {
- for (String consistentId : ids) {
- unitMeta.addConsistentId(consistentId);
+ return tracker.track(id, version, deployer.deploy(id,
version.render(), deploymentUnit.name(), unitContent)
+ .thenCompose(deployed -> {
+ if (deployed) {
+ return metastore.updateMeta(id,
version,
+ unitMeta -> unitMeta
+
.addConsistentId(clusterService.topologyService().localMember().name()));
+ }
+ return
CompletableFuture.completedFuture(false);
+ })
+ .thenApply(completed -> {
+ if (completed) {
+ messaging.startDeployAsyncToCmg(id,
version, deploymentUnit.name(), unitContent)
+ .thenAccept(ids ->
metastore.updateMeta(id, version, unitMeta -> {
+ for (String consistentId :
ids) {
+
unitMeta.addConsistentId(consistentId);
+ }
+
unitMeta.updateStatus(DEPLOYED);
+ }));
}
- unitMeta.updateStatus(DEPLOYED);
+ return completed;
}));
+ } else {
+ if (force) {
+ return undeployAsync(id, version)
+ .thenCompose(v -> deployAsync(id, version,
deploymentUnit));
+ }
+ LOG.warn("Failed to deploy meta of unit " + id + ":" +
version + " to metastore. "
+ + "Already exists.");
+ return CompletableFuture.failedFuture(
+ new DeploymentUnitAlreadyExistsException(id,
+ "Unit " + id + ":" + version + "
already exists"));
}
- return completed;
+
});
- return tracker.track(id, version, result);
}
@Override
@@ -173,13 +190,11 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
}
return CompletableFuture.failedFuture(new
DeploymentUnitNotFoundException(
"Unit " + id + " with version " + version + "
doesn't exist"));
- }).thenApply(logicalTopologySnapshot -> {
- allOf(logicalTopologySnapshot.nodes().stream()
- .map(node -> messaging.undeploy(node, id, version))
- .toArray(CompletableFuture[]::new))
- .thenAccept(unused -> metastore.removeIfExist(id,
version));
- return null;
- });
+ }).thenCompose(logicalTopologySnapshot -> allOf(
+ logicalTopologySnapshot.nodes().stream()
+ .map(node -> messaging.undeploy(node, id,
version))
+ .toArray(CompletableFuture[]::new))
+ .thenAccept(unused -> metastore.removeIfExist(id,
version)));
}
@Override
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
index 73715e879b..fe4c7dfab3 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
@@ -105,7 +105,7 @@ public class FileDeployerService {
IgniteUtils.deleteIfExistsThrowable(unitPath);
return true;
} catch (IOException e) {
- LOG.error("Failed to undeploy unit " + id + ":" + version, e);
+ LOG.debug("Failed to undeploy unit " + id + ":" + version, e);
return false;
}
}, executor);
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
index f59478955a..48d1d3f14b 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/IgniteDeployment.java
@@ -35,7 +35,21 @@ public interface IgniteDeployment extends IgniteComponent {
* @param deploymentUnit Unit content.
* @return Future with success or not result.
*/
- CompletableFuture<Boolean> deployAsync(String id, Version version,
DeploymentUnit deploymentUnit);
+ default CompletableFuture<Boolean> deployAsync(String id, Version version,
DeploymentUnit deploymentUnit) {
+ return deployAsync(id, version, false, deploymentUnit);
+ }
+
+ /**
+ * Deploy provided unit to current node.
+ * After deploy finished, this deployment unit will be place to CMG group
asynchronously.
+ *
+ * @param id Unit identifier. Not empty and not null.
+ * @param version Unit version.
+ * @param force Force redeploy if unit with provided id and version exists.
+ * @param deploymentUnit Unit content.
+ * @return Future with success or not result.
+ */
+ CompletableFuture<Boolean> deployAsync(String id, Version version, boolean
force, DeploymentUnit deploymentUnit);
/**
* Undeploy latest version of unit with corresponding identifier.
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index c1a617592b..1ad7c19411 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -22,7 +22,6 @@ import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
-import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -104,9 +103,11 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
@Test
public void testDeployUndeploy() {
Unit unit = deployAndVerifySmall("test",
Version.parseVersion("1.1.0"), 1);
- unit.undeploy();
IgniteImpl cmg = cluster.node(0);
+ waitUnitReplica(cmg, unit);
+
+ unit.undeploy();
waitUnitClean(cmg, unit);
CompletableFuture<List<UnitStatus>> list =
node(2).deployment().unitsAsync();
@@ -173,9 +174,6 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
assertThat(unit.undeployAsync(), willSucceedFast());
- assertThat(node(1).deployment().statusAsync(id)
- .thenApply(status1 -> status1.status(version)),
willBe(REMOVING));
-
waitUnitClean(unit.deployedNode, unit);
waitUnitClean(cmg, unit);
@@ -187,7 +185,7 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
public void testFindByConsistentId() {
String id = "test";
String version = "1.1.0";
- Unit unit = deployAndVerify(id, Version.parseVersion(version),
mediumFile, 1);
+ Unit unit = deployAndVerifyMedium(id, Version.parseVersion(version),
1);
IgniteImpl cmg = cluster.node(0);
waitUnitReplica(cmg, unit);
@@ -225,6 +223,22 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
assertThat(nodes, willBe(Collections.emptyList()));
}
+ @Test
+ public void testRedeploy() {
+ String id = "test";
+ String version = "1.1.0";
+ Unit smallUnit = deployAndVerify(id, Version.parseVersion(version),
smallFile, 1);
+
+ IgniteImpl cmg = cluster.node(0);
+ waitUnitReplica(cmg, smallUnit);
+
+ Unit mediumUnit = deployAndVerify(id, Version.parseVersion(version),
true, mediumFile, 1);
+ waitUnitReplica(cmg, mediumUnit);
+
+ waitUnitClean(smallUnit.deployedNode, smallUnit);
+ waitUnitClean(cmg, smallUnit);
+ }
+
private UnitStatus buildStatus(String id, Unit... units) {
IgniteImpl cmg = cluster.node(0);
@@ -243,10 +257,14 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
}
private Unit deployAndVerify(String id, Version version, DeployFile file,
int nodeIndex) {
+ return deployAndVerify(id, version, false, file, nodeIndex);
+ }
+
+ private Unit deployAndVerify(String id, Version version, boolean force,
DeployFile file, int nodeIndex) {
IgniteImpl entryNode = node(nodeIndex);
CompletableFuture<Boolean> deploy = entryNode.deployment()
- .deployAsync(id, version, fromPath(file.file));
+ .deployAsync(id, version, force, fromPath(file.file));
assertThat(deploy, willBe(true));
@@ -258,7 +276,7 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
}
private Unit deployAndVerifySmall(String id, Version version, int
nodeIndex) {
- return deployAndVerify(id, version, smallFile, nodeIndex);
+ return deployAndVerifyMedium(id, version, nodeIndex);
}
private Unit deployAndVerifyMedium(String id, Version version, int
nodeIndex) {