This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cd3a977d88 IGNITE-19519 Deployment unit removal (#2194)
cd3a977d88 is described below
commit cd3a977d8814313d1913c76d293d68e07b6a10d5
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Thu Jun 22 13:21:54 2023 +0300
IGNITE-19519 Deployment unit removal (#2194)
---
.../internal/deployunit/DefaultNodeCallback.java | 83 ++++++++++++-----
.../deployunit/DeployMessagingService.java | 38 --------
.../internal/deployunit/DeploymentManagerImpl.java | 99 ++++++++++++--------
.../internal/deployunit/FileDeployerService.java | 4 +-
.../deployunit/message/DeployUnitMessageTypes.java | 14 +--
.../deployunit/message/UndeployUnitRequest.java | 41 ---------
.../deployunit/message/UndeployUnitResponse.java | 28 ------
...ventCallback.java => ClusterEventCallback.java} | 30 +++----
.../metastore/ClusterEventCallbackImpl.java | 100 +++++++++++++++++++++
.../metastore/ClusterStatusWatchListener.java | 58 ++++++++++++
.../metastore/DeploymentUnitFailover.java | 86 ++++++++++++------
.../deployunit/metastore/DeploymentUnitStore.java | 47 +++++++++-
.../metastore/DeploymentUnitStoreImpl.java | 58 ++++++------
.../deployunit/metastore/NodeEventCallback.java | 23 ++---
.../metastore/NodeStatusWatchListener.java | 16 ++--
.../metastore/DeploymentUnitStoreImplTest.java | 56 +++++++++---
.../deployment/ItDeploymentUnitFailoverTest.java | 65 ++++++++------
.../internal/deployment/ItDeploymentUnitTest.java | 80 ++++++++---------
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
19 files changed, 568 insertions(+), 361 deletions(-)
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
index 79248e49a2..a84f1e858a 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
@@ -18,62 +18,69 @@
package org.apache.ignite.internal.deployunit;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.version.Version;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
-import org.apache.ignite.network.ClusterService;
/**
* Default implementation of {@link NodeEventCallback}.
*/
-public class DefaultNodeCallback implements NodeEventCallback {
+public class DefaultNodeCallback extends NodeEventCallback {
private final DeploymentUnitStore deploymentUnitStore;
private final DeployMessagingService messaging;
private final FileDeployerService deployer;
- private final ClusterService clusterService;
-
private final DownloadTracker tracker;
+ private final ClusterManagementGroupManager cmgManager;
+
+ private final String nodeName;
+
/**
* Constructor.
*
* @param deploymentUnitStore Deployment units store.
* @param messaging Deployment messaging service.
* @param deployer Deployment unit file system service.
- * @param clusterService Cluster service.
+ * @param cmgManager Cluster management group manager.
+ * @param nodeName Node consistent ID.
*/
public DefaultNodeCallback(
DeploymentUnitStore deploymentUnitStore,
DeployMessagingService messaging,
FileDeployerService deployer,
- ClusterService clusterService,
- DownloadTracker tracker
+ DownloadTracker tracker,
+ ClusterManagementGroupManager cmgManager,
+ String nodeName
) {
this.deploymentUnitStore = deploymentUnitStore;
this.messaging = messaging;
this.deployer = deployer;
- this.clusterService = clusterService;
this.tracker = tracker;
+ this.cmgManager = cmgManager;
+ this.nodeName = nodeName;
}
@Override
- public void onUploading(UnitNodeStatus status, List<String> holders) {
- tracker.track(status.id(), status.version(),
- () -> messaging.downloadUnitContent(status.id(),
status.version(), holders)
- .thenCompose(content -> deployer.deploy(status.id(),
status.version(), content))
+ public void onUploading(String id, Version version, List<UnitNodeStatus>
holders) {
+ tracker.track(id, version,
+ () -> messaging.downloadUnitContent(id, version, new
ArrayList<>(getDeployedNodeIds(holders)))
+ .thenCompose(content -> deployer.deploy(id, version,
content))
.thenApply(deployed -> {
if (deployed) {
- return deploymentUnitStore.updateNodeStatus(
- getLocalNodeId(),
- status.id(),
- status.version(),
- DEPLOYED);
+ return
deploymentUnitStore.updateNodeStatus(nodeName, id, version, DEPLOYED);
}
return deployed;
})
@@ -81,18 +88,48 @@ public class DefaultNodeCallback implements
NodeEventCallback {
}
@Override
- public void onDeploy(UnitNodeStatus status, List<String> holders) {
- deploymentUnitStore.getClusterStatus(status.id(), status.version())
+ public void onDeploy(String id, Version version, List<UnitNodeStatus>
holders) {
+ Set<String> nodeIds = getDeployedNodeIds(holders);
+ deploymentUnitStore.getClusterStatus(id, version)
.thenApply(UnitClusterStatus::initialNodesToDeploy)
- .thenApply(holders::containsAll)
+ .thenApply(nodeIds::containsAll)
.thenAccept(allRequiredDeployed -> {
if (allRequiredDeployed) {
- deploymentUnitStore.updateClusterStatus(status.id(),
status.version(), DEPLOYED);
+ deploymentUnitStore.updateClusterStatus(id, version,
DEPLOYED);
+ }
+ });
+ }
+
+ @Override
+ public void onObsolete(String id, Version version, List<UnitNodeStatus>
holders) {
+ //TODO: IGNITE-19708
+ deploymentUnitStore.updateNodeStatus(nodeName, id, version, REMOVING);
+ }
+
+ @Override
+ public void onRemoving(String id, Version version, List<UnitNodeStatus>
holders) {
+ cmgManager.logicalTopology()
+ .thenAccept(snapshot -> {
+ Set<String> nodes =
snapshot.nodes().stream().map(LogicalNode::name).collect(Collectors.toSet());
+ boolean allRemoved = holders.stream()
+ .filter(nodeStatus ->
nodes.contains(nodeStatus.nodeId()))
+ .allMatch(nodeStatus -> nodeStatus.status() ==
REMOVING);
+ if (allRemoved) {
+ deploymentUnitStore.updateClusterStatus(id, version,
REMOVING);
}
});
}
- private String getLocalNodeId() {
- return clusterService.topologyService().localMember().name();
+ /**
+ * Returns a set of node IDs where unit is deployed.
+ *
+ * @param holders List of unit node statuses.
+ * @return Set of node IDs where unit is deployed.
+ */
+ private static Set<String> getDeployedNodeIds(List<UnitNodeStatus>
holders) {
+ return holders.stream()
+ .filter(status -> status.status() == DEPLOYED)
+ .map(UnitNodeStatus::nodeId)
+ .collect(Collectors.toSet());
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
index 1834907736..0584c5b6d0 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
@@ -30,13 +30,9 @@ import
org.apache.ignite.internal.deployunit.message.DownloadUnitResponseImpl;
import org.apache.ignite.internal.deployunit.message.StopDeployRequest;
import org.apache.ignite.internal.deployunit.message.StopDeployRequestImpl;
import org.apache.ignite.internal.deployunit.message.StopDeployResponseImpl;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.network.ChannelType;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
/**
@@ -95,8 +91,6 @@ public class DeployMessagingService {
(message, senderConsistentId, correlationId) -> {
if (message instanceof DownloadUnitRequest) {
processDownloadRequest((DownloadUnitRequest) message,
senderConsistentId, correlationId);
- } else if (message instanceof UndeployUnitRequest) {
- processUndeployRequest((UndeployUnitRequest) message,
senderConsistentId, correlationId);
} else if (message instanceof StopDeployRequest) {
processStopDeployRequest((StopDeployRequest) message,
senderConsistentId, correlationId);
}
@@ -146,27 +140,6 @@ public class DeployMessagingService {
).toArray(CompletableFuture[]::new)));
}
- /**
- * Start undeploy process from provided node with provided id and version.
- *
- * @param node Cluster node.
- * @param id Deployment unit identifier.
- * @param version Deployment unit version.
- * @return Future with undeploy result.
- */
- public CompletableFuture<Void> undeploy(ClusterNode node, String id,
Version version) {
- return clusterService.messagingService()
- .invoke(node,
- DEPLOYMENT_CHANNEL,
- UndeployUnitRequestImpl.builder()
- .id(id)
- .version(version.render())
- .build(),
- Long.MAX_VALUE
- ).thenAccept(message ->
- LOG.info("Undeploy unit " + id + ":" + version + "
from node " + node + " finished"));
- }
-
private void processStopDeployRequest(StopDeployRequest request, String
senderConsistentId, long correlationId) {
tracker.cancelIfDownloading(request.id(),
Version.parseVersion(request.version()));
clusterService.messagingService()
@@ -174,17 +147,6 @@ public class DeployMessagingService {
}
- private void processUndeployRequest(UndeployUnitRequest executeRequest,
String senderConsistentId, long correlationId) {
- LOG.info("Start to undeploy " + executeRequest.id() + " with version "
+ executeRequest.version() + " from "
- + clusterService.topologyService().localMember().name());
- deployerService.undeploy(executeRequest.id(),
Version.parseVersion(executeRequest.version()))
- .thenRun(() -> clusterService.messagingService()
- .respond(senderConsistentId,
- UndeployUnitResponseImpl.builder().build(),
- correlationId)
- );
- }
-
private void processDownloadRequest(DownloadUnitRequest request, String
senderConsistentId, long correlationId) {
deployerService.getUnitContent(request.id(),
Version.parseVersion(request.version()))
.thenApply(content -> clusterService.messagingService()
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index bfa075a318..8e6177fac7 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -22,7 +22,6 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
-import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
import java.nio.file.Path;
import java.util.HashMap;
@@ -36,6 +35,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.compute.version.Version;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder;
import
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
@@ -43,8 +43,12 @@ import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitAlreadyExis
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
import
org.apache.ignite.internal.deployunit.exception.InvalidNodesArgumentException;
+import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallback;
+import
org.apache.ignite.internal.deployunit.metastore.ClusterEventCallbackImpl;
+import
org.apache.ignite.internal.deployunit.metastore.ClusterStatusWatchListener;
import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitFailover;
import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
+import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
import org.apache.ignite.internal.deployunit.metastore.NodeStatusWatchListener;
import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -74,11 +78,6 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
*/
private final ClusterManagementGroupManager cmgManager;
- /**
- * Cluster service.
- */
- private final ClusterService clusterService;
-
/**
* Deploy messaging service.
*/
@@ -109,6 +108,16 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
*/
private final DeploymentUnitAccessor deploymentUnitAccessor;
+ private final String nodeName;
+
+ private final NodeEventCallback nodeStatusCallback;
+
+ private final NodeStatusWatchListener nodeStatusWatchListener;
+
+ private final ClusterEventCallback clusterEventCallback;
+
+ private final ClusterStatusWatchListener clusterStatusWatchListener;
+
/**
* Constructor.
*
@@ -117,6 +126,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
* @param workDir Node working directory.
* @param configuration Deployment configuration.
* @param cmgManager Cluster management group manager.
+ * @param nodeName Node consistent ID.
*/
public DeploymentManagerImpl(
ClusterService clusterService,
@@ -124,9 +134,9 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
LogicalTopologyService logicalTopology,
Path workDir,
DeploymentConfiguration configuration,
- ClusterManagementGroupManager cmgManager
+ ClusterManagementGroupManager cmgManager,
+ String nodeName
) {
- this.clusterService = clusterService;
this.deploymentUnitStore = deploymentUnitStore;
this.configuration = configuration;
this.cmgManager = cmgManager;
@@ -135,7 +145,16 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
deployer = new FileDeployerService();
messaging = new DeployMessagingService(clusterService, cmgManager,
deployer, tracker);
deploymentUnitAccessor = new DeploymentUnitAccessorImpl(deployer);
- failover = new DeploymentUnitFailover(logicalTopology,
deploymentUnitStore, clusterService);
+
+ this.nodeName = nodeName;
+
+ nodeStatusCallback = new DefaultNodeCallback(deploymentUnitStore,
messaging, deployer, tracker, cmgManager, nodeName);
+ nodeStatusWatchListener = new
NodeStatusWatchListener(deploymentUnitStore, nodeName, nodeStatusCallback);
+
+ clusterEventCallback = new
ClusterEventCallbackImpl(deploymentUnitStore, deployer, cmgManager, nodeName);
+ clusterStatusWatchListener = new
ClusterStatusWatchListener(clusterEventCallback);
+
+ failover = new DeploymentUnitFailover(logicalTopology,
deploymentUnitStore, deployer, nodeName);
}
@Override
@@ -150,6 +169,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
Objects.requireNonNull(version);
Objects.requireNonNull(deploymentUnit);
+ LOG.info("Deploying {}:{} on {}", id, version, deployMode);
return extractNodes(deployMode)
.thenCompose(nodesToDeploy ->
doDeploy(id, version, force, deploymentUnit,
nodesToDeploy,
@@ -170,6 +190,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
Objects.requireNonNull(version);
Objects.requireNonNull(deploymentUnit);
+ LOG.info("Deploying {}:{} on {}", id, version, nodes);
return extractNodes(nodes)
.thenCompose(nodesToDeploy ->
doDeploy(id, version, force, deploymentUnit,
nodesToDeploy,
@@ -215,9 +236,8 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
return deployToLocalNode(id, version, unitContent)
.thenApply(completed -> {
if (completed) {
- String localNodeId = getLocalNodeId();
nodesToDeploy.forEach(node -> {
- if (!node.equals(localNodeId)) {
+ if (!node.equals(nodeName)) {
deploymentUnitStore.createNodeStatus(node, id,
version);
}
});
@@ -230,7 +250,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
return deployer.deploy(id, version, unitContent)
.thenCompose(deployed -> {
if (deployed) {
- return
deploymentUnitStore.createNodeStatus(getLocalNodeId(), id, version, DEPLOYED);
+ return deploymentUnitStore.createNodeStatus(nodeName,
id, version, DEPLOYED);
}
return completedFuture(false);
});
@@ -241,25 +261,31 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
checkId(id);
Objects.requireNonNull(version);
+ LOG.info("Undeploying {}:{}", id, version);
return messaging.stopInProgressDeploy(id, version)
.thenCompose(v -> deploymentUnitStore.updateClusterStatus(id,
version, OBSOLETE))
.thenCompose(success -> {
if (success) {
- //TODO: Check unit usages here. If unit used in
compute task we cannot just remove it.
- return deploymentUnitStore.updateClusterStatus(id,
version, REMOVING);
- }
- return completedFuture(false);
- })
- .thenCompose(success -> {
- if (success) {
- return cmgManager.logicalTopology();
+ return cmgManager.logicalTopology()
+ .thenCompose(logicalTopology -> {
+ Set<String> logicalNodes =
logicalTopology.nodes().stream()
+ .map(LogicalNode::name)
+ .collect(Collectors.toSet());
+ // Set OBSOLETE status only to nodes which
are present in the topology
+ return deploymentUnitStore.getAllNodes(id,
version)
+ .thenCompose(nodes ->
allOf(nodes.stream()
+
.filter(logicalNodes::contains)
+ .map(node ->
deploymentUnitStore.updateNodeStatus(node, id, version, OBSOLETE))
+
.toArray(CompletableFuture[]::new)))
+ .thenApply(v -> {
+ // Now the nodes are handling
the OBSOLETE node statuses and when all nodes are in the
+ // REMOVING status, the
cluster status will be changed to REMOVING
+ return true;
+ });
+ });
}
return failedFuture(new
DeploymentUnitNotFoundException(id, version));
- }).thenCompose(logicalTopologySnapshot -> allOf(
- logicalTopologySnapshot.nodes().stream()
- .map(node -> messaging.undeploy(node, id,
version))
- .toArray(CompletableFuture[]::new))
- ).thenCompose(unused -> deploymentUnitStore.remove(id,
version));
+ });
}
@Override
@@ -297,7 +323,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
@Override
public CompletableFuture<List<UnitStatuses>> nodeStatusesAsync() {
- return deploymentUnitStore.getNodeStatuses(getLocalNodeId())
+ return deploymentUnitStore.getNodeStatuses(nodeName)
.thenApply(DeploymentManagerImpl::fromUnitStatuses);
}
@@ -305,7 +331,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
public CompletableFuture<UnitStatuses> nodeStatusesAsync(String id) {
checkId(id);
- return deploymentUnitStore.getNodeStatuses(getLocalNodeId(), id)
+ return deploymentUnitStore.getNodeStatuses(nodeName, id)
.thenApply(statuses -> fromUnitStatuses(id, statuses));
}
@@ -314,7 +340,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
checkId(id);
Objects.requireNonNull(version);
- return deploymentUnitStore.getNodeStatus(getLocalNodeId(), id, version)
+ return deploymentUnitStore.getNodeStatus(nodeName, id, version)
.thenApply(DeploymentManagerImpl::extractDeploymentStatus);
}
@@ -325,7 +351,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
if (nodes.isEmpty()) {
return completedFuture(false);
}
- if (nodes.contains(getLocalNodeId())) {
+ if (nodes.contains(nodeName)) {
return completedFuture(true);
}
return messaging.downloadUnitContent(id, version, nodes)
@@ -347,23 +373,18 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
@Override
public void start() {
- DefaultNodeCallback callback = new
DefaultNodeCallback(deploymentUnitStore, messaging, deployer, clusterService,
tracker);
deployer.initUnitsFolder(workDir.resolve(configuration.deploymentLocation().value()));
- deploymentUnitStore.registerListener(new NodeStatusWatchListener(
- deploymentUnitStore,
- this::getLocalNodeId,
- callback));
+
deploymentUnitStore.registerNodeStatusListener(nodeStatusWatchListener);
+
deploymentUnitStore.registerClusterStatusListener(clusterStatusWatchListener);
messaging.subscribe();
- failover.registerTopologyChangeCallback(callback);
+ failover.registerTopologyChangeCallback(nodeStatusCallback,
clusterEventCallback);
}
@Override
public void stop() throws Exception {
tracker.cancelAll();
- }
-
- private String getLocalNodeId() {
- return clusterService.topologyService().localMember().name();
+
deploymentUnitStore.unregisterNodeStatusListener(nodeStatusWatchListener);
+
deploymentUnitStore.unregisterClusterStatusListener(clusterStatusWatchListener);
}
private static void checkId(String id) {
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
index a557b33c1c..a22d09381c 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
@@ -108,7 +108,7 @@ public class FileDeployerService {
IgniteUtils.deleteIfExistsThrowable(unitPath(id, version));
return true;
} catch (IOException e) {
- LOG.debug("Failed to get content for unit " + id + ":" +
version, e);
+ LOG.debug("Failed to undeploy unit " + id + ":" + version, e);
return false;
}
}, executor);
@@ -133,7 +133,7 @@ public class FileDeployerService {
}
});
} catch (IOException e) {
- LOG.debug("Failed to undeploy unit " + id + ":" + version, e);
+ LOG.debug("Failed to get content for unit " + id + ":" +
version, e);
}
return new UnitContent(result);
}, executor);
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
index 43985954c2..43661fd994 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
@@ -35,23 +35,13 @@ public class DeployUnitMessageTypes {
*/
public static final short DOWNLOAD_UNIT_RESPONSE = 1;
- /**
- * Message type for {@link UndeployUnitRequest}.
- */
- public static final short UNDEPLOY_UNIT_REQUEST = 2;
-
- /**
- * Message type for {@link UndeployUnitResponse}.
- */
- public static final short UNDEPLOY_UNIT_RESPONSE = 3;
-
/**
* Message type for {@link StopDeployRequest}.
*/
- public static final short STOP_DEPLOY_REQUEST = 4;
+ public static final short STOP_DEPLOY_REQUEST = 2;
/**
* Message type for {@link StopDeployResponse}.
*/
- public static final short STOP_DEPLOY_RESPONSE = 5;
+ public static final short STOP_DEPLOY_RESPONSE = 3;
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitRequest.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitRequest.java
deleted file mode 100644
index 4c4f847670..0000000000
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitRequest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.deployunit.message;
-
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-
-/**
- * Undeploy unit request.
- */
-@Transferable(DeployUnitMessageTypes.UNDEPLOY_UNIT_REQUEST)
-public interface UndeployUnitRequest extends NetworkMessage {
- /**
- * Returns id of deployment unit.
- *
- * @return id of deployment unit.
- */
- String id();
-
- /**
- * Returns version of deployment unit.
- *
- * @return version of deployment unit.
- */
- String version();
-}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
deleted file mode 100644
index 3eccbb243f..0000000000
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.deployunit.message;
-
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
-
-/**
- * Undeploy unit response.
- */
-@Transferable(DeployUnitMessageTypes.UNDEPLOY_UNIT_RESPONSE)
-public interface UndeployUnitResponse extends NetworkMessage {
-}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallback.java
similarity index 65%
copy from
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
copy to
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallback.java
index a74c31be0c..dfedfdd653 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallback.java
@@ -17,51 +17,49 @@
package org.apache.ignite.internal.deployunit.metastore;
-import java.util.List;
-import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
+import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
/**
- * Listener of deployment unit node status changes.
+ * Listener of deployment unit cluster status changes.
*/
-public interface NodeEventCallback {
+public abstract class ClusterEventCallback {
/**
* Change event.
*
* @param status Deployment unit status.
- * @param holders Nodes consistent id.
*/
- default void onUpdate(UnitNodeStatus status, List<String> holders) {
+ public void onUpdate(UnitClusterStatus status) {
switch (status.status()) {
case UPLOADING:
- onUploading(status, holders);
+ onUploading(status);
break;
case DEPLOYED:
- onDeploy(status, holders);
- break;
- case REMOVING:
- onRemoving(status, holders);
+ onDeploy(status);
break;
case OBSOLETE:
- onObsolete(status, holders);
+ onObsolete(status);
+ break;
+ case REMOVING:
+ onRemoving(status);
break;
default:
break;
}
}
- default void onUploading(UnitNodeStatus status, List<String> holders) {
+ protected void onUploading(UnitClusterStatus status) {
}
- default void onDeploy(UnitNodeStatus status, List<String> holders) {
+ protected void onDeploy(UnitClusterStatus status) {
}
- default void onObsolete(UnitNodeStatus status, List<String> holders) {
+ protected void onObsolete(UnitClusterStatus status) {
}
- default void onRemoving(UnitNodeStatus status, List<String> holders) {
+ protected void onRemoving(UnitClusterStatus status) {
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
new file mode 100644
index 0000000000..b8130502e8
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.version.Version;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import org.apache.ignite.internal.deployunit.FileDeployerService;
+import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+
+/** Listener of deployment unit cluster status changes. */
+public class ClusterEventCallbackImpl extends ClusterEventCallback {
+ private final DeploymentUnitStore deploymentUnitStore;
+
+ private final FileDeployerService deployerService;
+
+ private final ClusterManagementGroupManager cmgManager;
+
+ private final String nodeName;
+
+ /**
+ * Constructor.
+ *
+ * @param deploymentUnitStore Deployment units store.
+ * @param deployerService Deployment unit file system service.
+ * @param cmgManager Cluster management group manager.
+ * @param nodeName Node consistent ID.
+ */
+ public ClusterEventCallbackImpl(
+ DeploymentUnitStore deploymentUnitStore,
+ FileDeployerService deployerService,
+ ClusterManagementGroupManager cmgManager,
+ String nodeName
+ ) {
+ this.deploymentUnitStore = deploymentUnitStore;
+ this.deployerService = deployerService;
+ this.cmgManager = cmgManager;
+ this.nodeName = nodeName;
+ }
+
+ @Override
+ public void onRemoving(UnitClusterStatus status) {
+ String id = status.id();
+ Version version = status.version();
+ // Now the deployment unit can be removed from each target node and,
after it, remove corresponding status records.
+ deploymentUnitStore.getNodeStatus(nodeName, id,
version).thenAccept(nodeStatus -> {
+ if (nodeStatus != null && nodeStatus.status() == REMOVING) {
+ undeploy(id, version);
+ }
+ });
+ }
+
+ private void undeploy(String id, Version version) {
+ deployerService.undeploy(id, version).thenAccept(success -> {
+ if (success) {
+ deploymentUnitStore.removeNodeStatus(nodeName, id,
version).thenAccept(successRemove -> {
+ if (successRemove) {
+ removeClusterStatus(id, version);
+ }
+ });
+ }
+ });
+ }
+
+ private void removeClusterStatus(String id, Version version) {
+ cmgManager.logicalTopology().thenAccept(logicalTopology -> {
+ Set<String> logicalNodes = logicalTopology.nodes().stream()
+ .map(LogicalNode::name)
+ .collect(Collectors.toSet());
+ deploymentUnitStore.getAllNodes(id, version).thenAccept(nodes -> {
+ boolean emptyTopology = nodes.stream()
+ .filter(logicalNodes::contains)
+ .findAny()
+ .isEmpty();
+ if (emptyTopology) {
+ deploymentUnitStore.removeClusterStatus(id, version);
+ }
+ });
+ });
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterStatusWatchListener.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterStatusWatchListener.java
new file mode 100644
index 0000000000..af71ee6fb0
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterStatusWatchListener.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+
+/**
+ * Cluster statuses store watch listener.
+ */
+public class ClusterStatusWatchListener implements WatchListener {
+ private static final IgniteLogger LOG =
Loggers.forClass(ClusterStatusWatchListener.class);
+
+ private final ClusterEventCallback clusterEventCallback;
+
+ public ClusterStatusWatchListener(ClusterEventCallback
clusterEventCallback) {
+ this.clusterEventCallback = clusterEventCallback;
+ }
+
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent event) {
+ for (EntryEvent e : event.entryEvents()) {
+ byte[] value = e.newEntry().value();
+ if (value != null) {
+ UnitClusterStatus unitStatus =
UnitClusterStatus.deserialize(value);
+ clusterEventCallback.onUpdate(unitStatus);
+ }
+ }
+ return completedFuture(null);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ LOG.warn("Failed to process metastore deployment unit event. ", e);
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
index 396124500a..37c44d76d3 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
@@ -18,14 +18,20 @@
package org.apache.ignite.internal.deployunit.metastore;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
import java.util.Objects;
+import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.internal.deployunit.DeploymentStatus;
+import org.apache.ignite.internal.deployunit.FileDeployerService;
+import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
/**
* Deployment unit failover.
@@ -35,54 +41,84 @@ public class DeploymentUnitFailover {
private final DeploymentUnitStore deploymentUnitStore;
- private final ClusterService clusterService;
+ private final FileDeployerService deployer;
+
+ private final String nodeName;
/**
* Constructor.
*
* @param logicalTopology Logical topology service.
* @param deploymentUnitStore Deployment units store.
- * @param clusterService Cluster service.
+ * @param deployer Deployment unit file system service.
+ * @param nodeName Node consistent ID.
*/
- public DeploymentUnitFailover(
- LogicalTopologyService logicalTopology,
- DeploymentUnitStore deploymentUnitStore,
- ClusterService clusterService
- ) {
+ public DeploymentUnitFailover(LogicalTopologyService logicalTopology,
DeploymentUnitStore deploymentUnitStore,
+ FileDeployerService deployer, String nodeName) {
this.logicalTopology = logicalTopology;
this.deploymentUnitStore = deploymentUnitStore;
- this.clusterService = clusterService;
+ this.deployer = deployer;
+ this.nodeName = nodeName;
}
/**
* Register {@link NodeEventCallback} as topology change callback.
*
- * @param callback Node status callback.
+ * @param nodeEventCallback Node status callback.
+ * @param clusterEventCallback Cluster status callback.
*/
- public void registerTopologyChangeCallback(NodeEventCallback callback) {
+ public void registerTopologyChangeCallback(NodeEventCallback
nodeEventCallback, ClusterEventCallback clusterEventCallback) {
logicalTopology.addEventListener(new LogicalTopologyEventListener() {
@Override
public void onNodeJoined(LogicalNode joinedNode,
LogicalTopologySnapshot newTopology) {
String consistentId = joinedNode.name();
- if (Objects.equals(consistentId, getLocalNodeId())) {
+ if (Objects.equals(consistentId, nodeName)) {
deploymentUnitStore.getNodeStatuses(consistentId)
- .thenAccept(nodeStatuses ->
nodeStatuses.forEach(nodeStatus -> {
- if (nodeStatus.status() == UPLOADING) {
-
deploymentUnitStore.getClusterStatus(nodeStatus.id(), nodeStatus.version())
- .thenAccept(clusterStatus -> {
- if (clusterStatus.status() ==
UPLOADING || clusterStatus.status() == DEPLOYED) {
-
deploymentUnitStore.getAllNodes(nodeStatus.id(), nodeStatus.version())
- .thenAccept(nodes
-> callback.onUploading(nodeStatus, nodes));
- }
- });
- }
- }));
+ .thenAccept(nodeStatuses ->
nodeStatuses.forEach(unitNodeStatus ->
+
deploymentUnitStore.getClusterStatus(unitNodeStatus.id(),
unitNodeStatus.version())
+ .thenAccept(unitClusterStatus ->
+
processStatus(unitClusterStatus, unitNodeStatus, nodeEventCallback))));
}
}
});
}
- private String getLocalNodeId() {
- return clusterService.topologyService().localMember().name();
+ private void processStatus(UnitClusterStatus unitClusterStatus,
UnitNodeStatus unitNodeStatus, NodeEventCallback nodeEventCallback) {
+ String id = unitNodeStatus.id();
+ Version version = unitNodeStatus.version();
+ if (unitClusterStatus == null) {
+ deployer.undeploy(id, version)
+ .thenAccept(success -> {
+ if (success) {
+ deploymentUnitStore.removeNodeStatus(nodeName, id,
version);
+ }
+ });
+ return;
+ }
+ DeploymentStatus clusterStatus = unitClusterStatus.status();
+ DeploymentStatus nodeStatus = unitNodeStatus.status();
+ switch (clusterStatus) {
+ case UPLOADING: // fallthrough
+ case DEPLOYED:
+ if (nodeStatus == UPLOADING) {
+ deploymentUnitStore.getAllNodeStatuses(id, version)
+ .thenAccept(nodes ->
nodeEventCallback.onUpdate(unitNodeStatus, nodes));
+ }
+ break;
+ case OBSOLETE:
+ if (nodeStatus == DEPLOYED || nodeStatus == OBSOLETE) {
+ deploymentUnitStore.updateNodeStatus(nodeName, id,
version, REMOVING);
+ }
+ break;
+ case REMOVING:
+ deploymentUnitStore.getAllNodeStatuses(id, version)
+ .thenAccept(nodes -> {
+ UnitNodeStatus status = new UnitNodeStatus(id,
version, REMOVING, nodeName);
+ nodeEventCallback.onUpdate(status, nodes);
+ });
+ break;
+ default:
+ break;
+ }
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
index 60f18309bf..7e64f168b8 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
@@ -32,11 +32,32 @@ import
org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
*/
public interface DeploymentUnitStore {
/**
- * Register node statuses change events listener.
+ * Registers node statuses change events listener.
*
* @param listener Node statuses update listener.
*/
- void registerListener(NodeStatusWatchListener listener);
+ void registerNodeStatusListener(NodeStatusWatchListener listener);
+
+ /**
+ * Unregisters node statuses change events listener.
+ *
+ * @param listener Node statuses update listener.
+ */
+ void unregisterNodeStatusListener(NodeStatusWatchListener listener);
+
+ /**
+ * Registers cluster statuses change events listener.
+ *
+ * @param listener Cluster statuses update listener.
+ */
+ void registerClusterStatusListener(ClusterStatusWatchListener listener);
+
+ /**
+ * Unregisters cluster statuses change events listener.
+ *
+ * @param listener Cluster statuses update listener.
+ */
+ void unregisterClusterStatusListener(ClusterStatusWatchListener listener);
/**
* Returns cluster statuses of all existed deployment units.
@@ -154,11 +175,29 @@ public interface DeploymentUnitStore {
CompletableFuture<List<String>> getAllNodes(String id, Version version);
/**
- * Removes all data for deployment unit.
+ * Returns a list of node statuses where unit with provided identifier and
version is deployed.
+ *
+ * @param id Deployment unit identifier.
+ * @param version Deployment unit version.
+ * @return A list of node statuses where unit with provided identifier and
version is deployed or empty list.
+ */
+ CompletableFuture<List<UnitNodeStatus>> getAllNodeStatuses(String id,
Version version);
+
+ /**
+ * Removes cluster status.
+ *
+ * @param id Deployment unit identifier.
+ * @param version Deployment version identifier.
+ * @return Future with {@code true} result if removed successfully.
+ */
+ CompletableFuture<Boolean> removeClusterStatus(String id, Version version);
+
+ /**
+ * Removes node status.
*
* @param id Deployment unit identifier.
* @param version Deployment version identifier.
* @return Future with {@code true} result if removed successfully.
*/
- CompletableFuture<Boolean> remove(String id, Version version);
+ CompletableFuture<Boolean> removeNodeStatus(String nodeId, String id,
Version version);
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
index 6c6cd51b60..b4a7f30016 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
@@ -26,9 +26,6 @@ import static
org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -38,16 +35,12 @@ import java.util.stream.Collectors;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.deployunit.DeploymentStatus;
import
org.apache.ignite.internal.deployunit.metastore.accumulator.ClusterStatusAccumulator;
-import
org.apache.ignite.internal.deployunit.metastore.accumulator.KeyAccumulator;
import
org.apache.ignite.internal.deployunit.metastore.accumulator.NodeStatusAccumulator;
import org.apache.ignite.internal.deployunit.metastore.status.ClusterStatusKey;
import org.apache.ignite.internal.deployunit.metastore.status.NodeStatusKey;
import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.dsl.Condition;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
-import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.lang.ByteArray;
@@ -67,10 +60,25 @@ public class DeploymentUnitStoreImpl implements
DeploymentUnitStore {
}
@Override
- public void registerListener(NodeStatusWatchListener listener) {
+ public void registerNodeStatusListener(NodeStatusWatchListener listener) {
metaStorage.registerPrefixWatch(NodeStatusKey.builder().build().toByteArray(),
listener);
}
+ @Override
+ public void unregisterNodeStatusListener(NodeStatusWatchListener listener)
{
+ metaStorage.unregisterWatch(listener);
+ }
+
+ @Override
+ public void registerClusterStatusListener(ClusterStatusWatchListener
listener) {
+
metaStorage.registerPrefixWatch(ClusterStatusKey.builder().build().toByteArray(),
listener);
+ }
+
+ @Override
+ public void unregisterClusterStatusListener(ClusterStatusWatchListener
listener) {
+ metaStorage.unregisterWatch(listener);
+ }
+
@Override
public CompletableFuture<List<UnitClusterStatus>> getClusterStatuses() {
CompletableFuture<List<UnitClusterStatus>> result = new
CompletableFuture<>();
@@ -182,31 +190,25 @@ public class DeploymentUnitStoreImpl implements
DeploymentUnitStore {
}
@Override
- public CompletableFuture<Boolean> remove(String id, Version version) {
- ByteArray key =
ClusterStatusKey.builder().id(id).version(version).build().toByteArray();
- CompletableFuture<List<byte[]>> nodesFuture = new
CompletableFuture<>();
-
metaStorage.prefix(NodeStatusKey.builder().id(id).version(version).build().toByteArray())
- .subscribe(new KeyAccumulator().toSubscriber(nodesFuture));
-
- return nodesFuture.thenCompose(nodes ->
- metaStorage.invoke(existsAll(key, nodes), removeAll(key,
nodes), Collections.emptyList())
- );
+ public CompletableFuture<List<UnitNodeStatus>> getAllNodeStatuses(String
id, Version version) {
+ CompletableFuture<List<UnitNodeStatus>> result = new
CompletableFuture<>();
+ ByteArray nodes =
NodeStatusKey.builder().id(id).version(version).build().toByteArray();
+ metaStorage.prefix(nodes).subscribe(new
NodeStatusAccumulator().toSubscriber(result));
+ return result;
}
- private static Condition existsAll(ByteArray key, List<byte[]> nodeKeys) {
- Condition result = exists(key);
- for (byte[] keyArr : nodeKeys) {
- result = Conditions.and(result, exists(new ByteArray(keyArr)));
- }
- return result;
+ @Override
+ public CompletableFuture<Boolean> removeClusterStatus(String id, Version
version) {
+ ByteArray key =
ClusterStatusKey.builder().id(id).version(version).build().toByteArray();
+
+ return metaStorage.invoke(exists(key), Operations.remove(key), noop());
}
- private static Collection<Operation> removeAll(ByteArray key, List<byte[]>
keys) {
- List<Operation> operations = new ArrayList<>();
- operations.add(Operations.remove(key));
+ @Override
+ public CompletableFuture<Boolean> removeNodeStatus(String nodeId, String
id, Version version) {
+ ByteArray key =
NodeStatusKey.builder().id(id).version(version).nodeId(nodeId).build().toByteArray();
-
keys.stream().map(ByteArray::new).map(Operations::remove).collect(Collectors.toCollection(()
-> operations));
- return operations;
+ return metaStorage.invoke(exists(key), Operations.remove(key), noop());
}
/**
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
index a74c31be0c..6e10894de0 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
@@ -18,50 +18,51 @@
package org.apache.ignite.internal.deployunit.metastore;
import java.util.List;
+import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
/**
* Listener of deployment unit node status changes.
*/
-public interface NodeEventCallback {
+public abstract class NodeEventCallback {
/**
* Change event.
*
* @param status Deployment unit status.
- * @param holders Nodes consistent id.
+ * @param holders Node statuses.
*/
- default void onUpdate(UnitNodeStatus status, List<String> holders) {
+ public void onUpdate(UnitNodeStatus status, List<UnitNodeStatus> holders) {
switch (status.status()) {
case UPLOADING:
- onUploading(status, holders);
+ onUploading(status.id(), status.version(), holders);
break;
case DEPLOYED:
- onDeploy(status, holders);
+ onDeploy(status.id(), status.version(), holders);
break;
case REMOVING:
- onRemoving(status, holders);
+ onRemoving(status.id(), status.version(), holders);
break;
case OBSOLETE:
- onObsolete(status, holders);
+ onObsolete(status.id(), status.version(), holders);
break;
default:
break;
}
}
- default void onUploading(UnitNodeStatus status, List<String> holders) {
+ protected void onUploading(String id, Version version,
List<UnitNodeStatus> holders) {
}
- default void onDeploy(UnitNodeStatus status, List<String> holders) {
+ protected void onDeploy(String id, Version version, List<UnitNodeStatus>
holders) {
}
- default void onObsolete(UnitNodeStatus status, List<String> holders) {
+ protected void onObsolete(String id, Version version, List<UnitNodeStatus>
holders) {
}
- default void onRemoving(UnitNodeStatus status, List<String> holders) {
+ protected void onRemoving(String id, Version version, List<UnitNodeStatus>
holders) {
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
index 67a2d684f3..4b8ff48294 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
@@ -23,7 +23,6 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.function.Supplier;
import org.apache.ignite.internal.deployunit.metastore.status.NodeStatusKey;
import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -42,7 +41,7 @@ public class NodeStatusWatchListener implements WatchListener
{
private final DeploymentUnitStore deploymentUnitStore;
- private final Supplier<String> localNodeProvider;
+ private final String nodeName;
private final NodeEventCallback callback;
@@ -53,14 +52,12 @@ public class NodeStatusWatchListener implements
WatchListener {
* Constructor.
*
* @param deploymentUnitStore Unit statuses store.
- * @param localNodeProvider Node consistent identifier provider.
+ * @param nodeName Node consistent ID.
* @param callback Node event callback.
*/
- public NodeStatusWatchListener(DeploymentUnitStore deploymentUnitStore,
- Supplier<String> localNodeProvider,
- NodeEventCallback callback) {
+ public NodeStatusWatchListener(DeploymentUnitStore deploymentUnitStore,
String nodeName, NodeEventCallback callback) {
this.deploymentUnitStore = deploymentUnitStore;
- this.localNodeProvider = localNodeProvider;
+ this.nodeName = nodeName;
this.callback = callback;
}
@@ -74,15 +71,14 @@ public class NodeStatusWatchListener implements
WatchListener {
NodeStatusKey nodeStatusKey = NodeStatusKey.fromBytes(key);
- if (!Objects.equals(localNodeProvider.get(),
nodeStatusKey.nodeId())
- || value == null) {
+ if (!Objects.equals(nodeName, nodeStatusKey.nodeId()) || value ==
null) {
continue;
}
UnitNodeStatus nodeStatus = UnitNodeStatus.deserialize(value);
CompletableFuture.supplyAsync(() -> nodeStatus, executor)
- .thenComposeAsync(status ->
deploymentUnitStore.getAllNodes(status.id(), status.version()), executor)
+ .thenComposeAsync(status ->
deploymentUnitStore.getAllNodeStatuses(status.id(), status.version()), executor)
.thenAccept(nodes -> callback.onUpdate(nodeStatus, nodes));
}
return completedFuture(null);
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
index 234a12ac6c..7405e4ca5d 100644
---
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -34,6 +35,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallback;
+import
org.apache.ignite.internal.deployunit.metastore.ClusterStatusWatchListener;
import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl;
import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
import org.apache.ignite.internal.deployunit.metastore.NodeStatusWatchListener;
@@ -47,7 +50,6 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
-import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -61,12 +63,21 @@ public class DeploymentUnitStoreImplTest {
private final VaultManager vaultManager = new VaultManager(new
InMemoryVaultService());
- private final List<UnitNodeStatus> history =
Collections.synchronizedList(new ArrayList<>());
+ private final List<UnitNodeStatus> nodeHistory =
Collections.synchronizedList(new ArrayList<>());
- private final NodeEventCallback listener = new NodeEventCallback() {
+ private final NodeEventCallback nodeEventCallback = new
NodeEventCallback() {
@Override
- public void onUpdate(UnitNodeStatus status, List<String> holders) {
- history.add(status);
+ public void onUpdate(UnitNodeStatus status, List<UnitNodeStatus>
holders) {
+ nodeHistory.add(status);
+ }
+ };
+
+ private final List<UnitClusterStatus> clusterHistory =
Collections.synchronizedList(new ArrayList<>());
+
+ private final ClusterEventCallback clusterEventCallback = new
ClusterEventCallback() {
+ @Override
+ public void onUpdate(UnitClusterStatus status) {
+ clusterHistory.add(status);
}
};
@@ -77,12 +88,14 @@ public class DeploymentUnitStoreImplTest {
@BeforeEach
public void setup() {
- history.clear();
+ nodeHistory.clear();
+ clusterHistory.clear();
KeyValueStorage storage = new RocksDbKeyValueStorage("test", workDir);
MetaStorageManager metaStorageManager =
StandaloneMetaStorageManager.create(vaultManager, storage);
metastore = new DeploymentUnitStoreImpl(metaStorageManager);
- metastore.registerListener(new NodeStatusWatchListener(metastore, ()
-> LOCAL_NODE, listener));
+ metastore.registerNodeStatusListener(new
NodeStatusWatchListener(metastore, LOCAL_NODE, nodeEventCallback));
+ metastore.registerClusterStatusListener(new
ClusterStatusWatchListener(clusterEventCallback));
vaultManager.start();
metaStorageManager.start();
@@ -104,7 +117,7 @@ public class DeploymentUnitStoreImplTest {
assertThat(metastore.getClusterStatus(id, version),
willBe(new UnitClusterStatus(id, version, DEPLOYED,
Set.of())));
- assertThat(metastore.remove(id, version), willBe(true));
+ assertThat(metastore.removeClusterStatus(id, version), willBe(true));
assertThat(metastore.getClusterStatus(id, version),
willBe(nullValue()));
}
@@ -144,12 +157,12 @@ public class DeploymentUnitStoreImplTest {
willBe(contains((new UnitClusterStatus(id, version, DEPLOYED,
Set.of(node1, node2, node3)))))
);
- assertThat(metastore.remove(id, version), willBe(true));
+ assertThat(metastore.removeNodeStatus(node1, id, version),
willBe(true));
assertThat(metastore.getNodeStatus(node1, id, version),
willBe(nullValue()));
}
@Test
- public void testNodeEventListener() throws InterruptedException {
+ public void testNodeEventListener() {
String id = "id5";
Version version = Version.parseVersion("1.1.1");
String node1 = LOCAL_NODE;
@@ -159,12 +172,31 @@ public class DeploymentUnitStoreImplTest {
assertThat(metastore.updateNodeStatus(node1, id, version, OBSOLETE),
willBe(true));
assertThat(metastore.updateNodeStatus(node1, id, version, REMOVING),
willBe(true));
- Awaitility.await().untilAsserted(() ->
- assertThat(history, containsInAnyOrder(
+ await().untilAsserted(() ->
+ assertThat(nodeHistory, containsInAnyOrder(
new UnitNodeStatus(id, version, UPLOADING, node1),
new UnitNodeStatus(id, version, DEPLOYED, node1),
new UnitNodeStatus(id, version, OBSOLETE, node1),
new UnitNodeStatus(id, version, REMOVING, node1)
)));
}
+
+ @Test
+ public void testClusterEventListener() {
+ String id = "id6";
+ Version version = Version.parseVersion("1.1.1");
+
+ assertThat(metastore.createClusterStatus(id, version, Set.of()),
willBe(true));
+ assertThat(metastore.updateClusterStatus(id, version, DEPLOYED),
willBe(true));
+ assertThat(metastore.updateClusterStatus(id, version, OBSOLETE),
willBe(true));
+ assertThat(metastore.updateClusterStatus(id, version, REMOVING),
willBe(true));
+
+ await().untilAsserted(() ->
+ assertThat(clusterHistory, containsInAnyOrder(
+ new UnitClusterStatus(id, version, UPLOADING,
Set.of()),
+ new UnitClusterStatus(id, version, DEPLOYED, Set.of()),
+ new UnitClusterStatus(id, version, OBSOLETE, Set.of()),
+ new UnitClusterStatus(id, version, REMOVING, Set.of())
+ )));
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
index a5b68c381f..fbe3ff5c93 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
@@ -17,9 +17,13 @@
package org.apache.ignite.internal.deployment;
-import java.util.Arrays;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+
import java.util.List;
-import java.util.stream.Collectors;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
@@ -38,42 +42,51 @@ public class ItDeploymentUnitFailoverTest extends
ClusterPerTestIntegrationTest
files = new DeployFiles(workDir);
}
- @Override
- protected int initialNodes() {
- return 9;
- }
-
- @Override
- protected int[] cmgMetastoreNodes() {
- return new int[] {6, 7, 8};
- }
-
@Test
public void testDeployWithNodeStop() {
- IgniteImpl cmgNode = cluster.node(8);
-
- List<String> cmgNodes = Arrays.stream(cmgMetastoreNodes())
- .mapToObj(i -> node(i).name())
- .collect(Collectors.toList());
+ int nodeIndex = 1;
+ IgniteImpl node = node(nodeIndex);
+ IgniteImpl cmgNode = node(0);
- // Deploy to all CMG nodes, not only to majority
+ // Deploy to majority and additional node
Unit big = files.deployAndVerify(
"id1",
Version.parseVersion("1.0.0"),
false,
List.of(files.bigFile()),
null,
- cmgNodes,
- cluster.node(3)
+ List.of(node.name()),
+ cmgNode
);
- stopNode(8);
+ stopNode(nodeIndex);
- big.waitUnitClean(cmgNode);
- big.waitUnitReplica(cluster.node(6));
- big.waitUnitReplica(cluster.node(7));
-
- cmgNode = startNode(8);
+ big.waitUnitClean(node);
big.waitUnitReplica(cmgNode);
+
+ node = startNode(nodeIndex);
+ big.waitUnitReplica(node);
+ }
+
+ @Test
+ public void testUndeployWithNodeStop() {
+ int nodeIndex = 1;
+ String id = "id1";
+ Version version = Version.parseVersion("1.0.0");
+ Unit unit = files.deployAndVerify(
+ id, version, false,
+ List.of(files.smallFile()),
+ null, List.of(node(nodeIndex).name()),
+ node(0)
+ );
+
+ await().until(() ->
node(nodeIndex).deployment().clusterStatusAsync(id, version), willBe(DEPLOYED));
+
+ stopNode(nodeIndex);
+
+ assertThat(unit.undeployAsync(), willCompleteSuccessfully());
+
+ IgniteImpl cmgNode = startNode(nodeIndex);
+ unit.waitUnitClean(cmgNode);
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index 42e3561178..ceb026d673 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -26,9 +26,9 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.nullValue;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.version.Version;
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.deployunit.IgniteDeployment;
import org.apache.ignite.internal.deployunit.InitialDeployMode;
import org.apache.ignite.internal.deployunit.UnitStatuses;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -56,16 +57,16 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
@Test
public void testDeploy() {
String id = "test";
- Unit unit = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.0"), cluster.node(1));
+ Unit unit = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.0"), node(1));
- IgniteImpl cmg = cluster.node(0);
+ IgniteImpl cmg = node(0);
unit.waitUnitReplica(cmg);
UnitStatuses status = buildStatus(id, unit);
await().timeout(2, SECONDS)
.pollDelay(500, MILLISECONDS)
- .until(() -> node(2).deployment().clusterStatusesAsync(),
willBe(Collections.singletonList(status)));
+ .until(() -> node(2).deployment().clusterStatusesAsync(),
willBe(List.of(status)));
}
@Test
@@ -75,40 +76,40 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
Version.parseVersion("1.1.0"),
false,
List.of(files.smallFile(), files.mediumFile()),
- cluster.node(1)
+ node(1)
);
- IgniteImpl cmg = cluster.node(0);
+ IgniteImpl cmg = node(0);
unit.waitUnitReplica(cmg);
UnitStatuses status = buildStatus(id, unit);
await().timeout(2, SECONDS)
.pollDelay(500, MILLISECONDS)
- .until(() -> node(2).deployment().clusterStatusesAsync(),
willBe(Collections.singletonList(status)));
+ .until(() -> node(2).deployment().clusterStatusesAsync(),
willBe(List.of(status)));
}
@Test
public void testDeployUndeploy() {
- Unit unit = files.deployAndVerifySmall("test",
Version.parseVersion("1.1.0"), cluster.node(1));
+ Unit unit = files.deployAndVerifySmall("test",
Version.parseVersion("1.1.0"), node(1));
- IgniteImpl cmg = cluster.node(0);
+ IgniteImpl cmg = node(0);
unit.waitUnitReplica(cmg);
unit.undeploy();
unit.waitUnitClean(cmg);
CompletableFuture<List<UnitStatuses>> list =
node(2).deployment().clusterStatusesAsync();
- assertThat(list, willBe(Collections.emptyList()));
+ assertThat(list, willBe(empty()));
}
@Test
public void testDeployTwoUnits() {
String id = "test";
- Unit unit1 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.0"), cluster.node(1));
- Unit unit2 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.1"), cluster.node(2));
+ Unit unit1 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.0"), node(1));
+ Unit unit2 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.1"), node(2));
- IgniteImpl cmg = cluster.node(0);
+ IgniteImpl cmg = node(0);
unit1.waitUnitReplica(cmg);
unit2.waitUnitReplica(cmg);
@@ -125,10 +126,10 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
@Test
public void testDeployTwoUnitsAndUndeployOne() {
String id = "test";
- Unit unit1 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.0"), cluster.node(1));
- Unit unit2 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.1"), cluster.node(2));
+ Unit unit1 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.0"), node(1));
+ Unit unit2 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.1"), node(2));
- IgniteImpl cmg = cluster.node(0);
+ IgniteImpl cmg = node(0);
unit1.waitUnitReplica(cmg);
unit2.waitUnitReplica(cmg);
@@ -147,12 +148,12 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
public void testDeploymentStatus() {
String id = "test";
Version version = Version.parseVersion("1.1.0");
- Unit unit = files.deployAndVerifyMedium(id, version, cluster.node(1));
+ Unit unit = files.deployAndVerifyMedium(id, version, node(1));
CompletableFuture<DeploymentStatus> status =
node(2).deployment().clusterStatusAsync(id, version);
assertThat(status, willBe(UPLOADING));
- IgniteImpl cmg = cluster.node(0);
+ IgniteImpl cmg = node(0);
unit.waitUnitReplica(cmg);
await().timeout(2, SECONDS)
@@ -167,16 +168,17 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
assertThat(node(2).deployment().clusterStatusAsync(id, version),
willBe(nullValue()));
}
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19757")
@Test
public void testRedeploy() {
String id = "test";
String version = "1.1.0";
- Unit smallUnit = files.deployAndVerifySmall(id,
Version.parseVersion(version), cluster.node(1));
+ Unit smallUnit = files.deployAndVerifySmall(id,
Version.parseVersion(version), node(1));
- IgniteImpl cmg = cluster.node(0);
+ IgniteImpl cmg = node(0);
smallUnit.waitUnitReplica(cmg);
- Unit mediumUnit = files.deployAndVerify(id,
Version.parseVersion(version), true, List.of(files.mediumFile()),
cluster.node(1));
+ Unit mediumUnit = files.deployAndVerify(id,
Version.parseVersion(version), true, List.of(files.mediumFile()), node(1));
mediumUnit.waitUnitReplica(cmg);
smallUnit.waitUnitClean(smallUnit.deployedNode());
@@ -187,12 +189,12 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
public void testOnDemandDeploy() {
String id = "test";
Version version = Version.parseVersion("1.1.0");
- Unit smallUnit = files.deployAndVerifySmall(id, version,
cluster.node(1));
+ Unit smallUnit = files.deployAndVerifySmall(id, version, node(1));
- IgniteImpl cmg = cluster.node(0);
+ IgniteImpl cmg = node(0);
smallUnit.waitUnitReplica(cmg);
- IgniteImpl onDemandDeployNode = cluster.node(2);
+ IgniteImpl onDemandDeployNode = node(2);
CompletableFuture<Boolean> onDemandDeploy =
onDemandDeployNode.deployment().onDemandDeploy(id, version);
assertThat(onDemandDeploy, willBe(true));
@@ -203,12 +205,12 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
public void testOnDemandDeployToDeployedNode() {
String id = "test";
Version version = Version.parseVersion("1.1.0");
- Unit smallUnit = files.deployAndVerifySmall(id, version,
cluster.node(1));
+ Unit smallUnit = files.deployAndVerifySmall(id, version, node(1));
- IgniteImpl cmg = cluster.node(0);
+ IgniteImpl cmg = node(0);
smallUnit.waitUnitReplica(cmg);
- IgniteImpl onDemandDeployNode = cluster.node(1);
+ IgniteImpl onDemandDeployNode = node(1);
CompletableFuture<Boolean> onDemandDeploy =
onDemandDeployNode.deployment().onDemandDeploy(id, version);
assertThat(onDemandDeploy, willBe(true));
@@ -218,39 +220,30 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
@Test
public void testDeployToCmg() {
String id = "test";
- Version version = Version.parseVersion("1.1.0");
- Unit smallUnit = files.deployAndVerifySmall(id, version,
cluster.node(0));
+ Unit smallUnit = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.0"), node(0));
- await().untilAsserted(() -> {
- CompletableFuture<List<UnitStatuses>> list =
node(0).deployment().clusterStatusesAsync();
- assertThat(list,
willBe(List.of(UnitStatuses.builder(id).append(version, DEPLOYED).build())));
- });
+ await().until(() -> node(0).deployment().clusterStatusesAsync(id),
willBe(buildStatus(id, smallUnit)));
}
@Test
public void testDeployToSpecificNode() {
String id = "test";
- Version version = Version.parseVersion("1.1.0");
Unit smallUnit = files.deployAndVerify(
- id, version, false, List.of(files.smallFile()),
+ id, Version.parseVersion("1.1.0"), false,
List.of(files.smallFile()),
null, List.of(node(1).name()),
node(0)
);
smallUnit.waitUnitReplica(node(1));
- await().untilAsserted(() -> {
- CompletableFuture<List<UnitStatuses>> list =
node(0).deployment().clusterStatusesAsync();
- assertThat(list,
willBe(List.of(UnitStatuses.builder(id).append(version, DEPLOYED).build())));
- });
+ await().until(() -> node(0).deployment().clusterStatusesAsync(id),
willBe(buildStatus(id, smallUnit)));
}
@Test
public void testDeployToAll() {
String id = "test";
- Version version = Version.parseVersion("1.1.0");
Unit smallUnit = files.deployAndVerify(
- id, version, false, List.of(files.smallFile()),
+ id, Version.parseVersion("1.1.0"), false,
List.of(files.smallFile()),
InitialDeployMode.ALL, List.of(),
node(0)
);
@@ -258,9 +251,6 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
smallUnit.waitUnitReplica(node(1));
smallUnit.waitUnitReplica(node(2));
- await().untilAsserted(() -> {
- CompletableFuture<List<UnitStatuses>> list =
node(0).deployment().clusterStatusesAsync();
- assertThat(list,
willBe(List.of(UnitStatuses.builder(id).append(version, DEPLOYED).build())));
- });
+ await().until(() -> node(0).deployment().clusterStatusesAsync(id),
willBe(buildStatus(id, smallUnit)));
}
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 322252bfd5..c8d20a5375 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -539,7 +539,8 @@ public class IgniteImpl implements Ignite {
logicalTopologyService,
workDir,
nodeConfigRegistry.getConfiguration(DeploymentConfiguration.KEY),
- cmgMgr
+ cmgMgr,
+ name
);
deploymentManager = deploymentManagerImpl;