This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 995eba14b3 IGNITE-19518 Validate unit after node restart (#2151)
995eba14b3 is described below
commit 995eba14b3cbe23431d75e615bd33eaae145c4a1
Author: Mikhail <[email protected]>
AuthorDate: Mon Jun 12 12:30:34 2023 +0300
IGNITE-19518 Validate unit after node restart (#2151)
---
.../internal/deployunit/DefaultNodeCallback.java | 98 +++++++
.../deployunit/DeployMessagingService.java | 6 +-
.../internal/deployunit/DeploymentManagerImpl.java | 59 ++---
.../{DeployTracker.java => DownloadTracker.java} | 24 +-
.../metastore/DeploymentUnitFailover.java | 88 +++++++
.../deployunit/metastore/DeploymentUnitStore.java | 2 -
.../deployunit/metastore/NodeEventCallback.java | 38 ++-
.../metastore/NodeStatusWatchListener.java | 7 +-
.../metastore/DeploymentUnitStoreImplTest.java | 7 +-
.../java/org/apache/ignite/internal/Cluster.java | 39 ++-
.../internal/ClusterPerTestIntegrationTest.java | 6 +-
.../ignite/internal/deployment/DeployFile.java | 56 ++++
.../ignite/internal/deployment/DeployFiles.java | 155 +++++++++++
.../deployment/ItDeploymentUnitFailoverTest.java | 62 +++++
.../internal/deployment/ItDeploymentUnitTest.java | 284 ++++-----------------
.../apache/ignite/internal/deployment/Unit.java | 129 ++++++++++
.../apache/ignite/internal/jdbc/ItJdbcTest.java | 2 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
18 files changed, 751 insertions(+), 312 deletions(-)
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
new file mode 100644
index 0000000000..3d26681e33
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
+
+import java.util.List;
+import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
+import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
+import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Default implementation of {@link NodeEventCallback}.
+ */
+public class DefaultNodeCallback implements NodeEventCallback {
+ private final DeploymentUnitStore deploymentUnitStore;
+
+ private final DeployMessagingService messaging;
+
+ private final FileDeployerService deployer;
+
+ private final ClusterService clusterService;
+
+ private final DownloadTracker tracker;
+
+ /**
+ * Constructor.
+ *
+ * @param deploymentUnitStore Deployment units store.
+ * @param messaging Deployment messaging service.
+ * @param deployer Deployment unit file system service.
+ * @param clusterService Cluster service.
+ */
+ public DefaultNodeCallback(
+ DeploymentUnitStore deploymentUnitStore,
+ DeployMessagingService messaging,
+ FileDeployerService deployer,
+ ClusterService clusterService,
+ DownloadTracker tracker
+ ) {
+ this.deploymentUnitStore = deploymentUnitStore;
+ this.messaging = messaging;
+ this.deployer = deployer;
+ this.clusterService = clusterService;
+ this.tracker = tracker;
+ }
+
+ @Override
+ public void onUploading(UnitNodeStatus status, List<String> holders) {
+ tracker.track(status.id(), status.version(),
+ () -> messaging.downloadUnitContent(status.id(),
status.version(), holders)
+ .thenCompose(content -> deployer.deploy(status.id(),
status.version(), content))
+ .thenApply(deployed -> {
+ if (deployed) {
+ return deploymentUnitStore.updateNodeStatus(
+ getLocalNodeId(),
+ status.id(),
+ status.version(),
+ DEPLOYED);
+ }
+ return deployed;
+ })
+ );
+ }
+
+ @Override
+ public void onDeploy(UnitNodeStatus status, List<String> holders) {
+ deploymentUnitStore.getClusterStatus(status.id(), status.version())
+ .thenApply(UnitClusterStatus::initialNodesToDeploy)
+ .thenApply(holders::containsAll)
+ .thenAccept(allRequiredDeployed -> {
+ if (allRequiredDeployed) {
+ deploymentUnitStore.updateClusterStatus(status.id(),
status.version(), DEPLOYED);
+ }
+ });
+ }
+
+ private String getLocalNodeId() {
+ return clusterService.topologyService().localMember().name();
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
index 31d652446a..1834907736 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
@@ -65,7 +65,7 @@ public class DeployMessagingService {
/**
* Tracker of deploy actions.
*/
- private final DeployTracker tracker;
+ private final DownloadTracker tracker;
/**
* Constructor.
@@ -79,7 +79,7 @@ public class DeployMessagingService {
ClusterService clusterService,
ClusterManagementGroupManager cmgManager,
FileDeployerService deployerService,
- DeployTracker tracker
+ DownloadTracker tracker
) {
this.clusterService = clusterService;
this.cmgManager = cmgManager;
@@ -168,7 +168,7 @@ public class DeployMessagingService {
}
private void processStopDeployRequest(StopDeployRequest request, String
senderConsistentId, long correlationId) {
- tracker.cancelIfDeploy(request.id(),
Version.parseVersion(request.version()));
+ tracker.cancelIfDownloading(request.id(),
Version.parseVersion(request.version()));
clusterService.messagingService()
.respond(senderConsistentId,
StopDeployResponseImpl.builder().build(), correlationId);
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 8f0d462066..2be1863a52 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -23,30 +23,28 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.OBSOLETE;
import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.REMOVING;
-import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.UPLOADING;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.compute.version.Version;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder;
import
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitAlreadyExistsException;
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
+import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitFailover;
import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl;
import org.apache.ignite.internal.deployunit.metastore.NodeStatusWatchListener;
import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
-import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -58,7 +56,6 @@ import org.jetbrains.annotations.Nullable;
* Deployment manager implementation.
*/
public class DeploymentManagerImpl implements IgniteDeployment {
-
private static final IgniteLogger LOG =
Loggers.forClass(DeploymentManagerImpl.class);
/**
@@ -99,7 +96,12 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
/**
* Deploy tracker.
*/
- private final DeployTracker tracker;
+ private final DownloadTracker tracker;
+
+ /**
+ * Failover.
+ */
+ private final DeploymentUnitFailover failover;
/**
* Constructor.
@@ -110,45 +112,23 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
* @param configuration Deployment configuration.
* @param cmgManager Cluster management group manager.
*/
- public DeploymentManagerImpl(ClusterService clusterService,
+ public DeploymentManagerImpl(
+ ClusterService clusterService,
MetaStorageManager metaStorage,
+ LogicalTopologyService logicalTopology,
Path workDir,
DeploymentConfiguration configuration,
- ClusterManagementGroupManager cmgManager) {
+ ClusterManagementGroupManager cmgManager
+ ) {
this.clusterService = clusterService;
this.configuration = configuration;
this.cmgManager = cmgManager;
this.workDir = workDir;
- tracker = new DeployTracker();
+ tracker = new DownloadTracker();
deployer = new FileDeployerService();
messaging = new DeployMessagingService(clusterService, cmgManager,
deployer, tracker);
deploymentUnitStore = new DeploymentUnitStoreImpl(metaStorage);
- }
-
- private void onUnitRegister(UnitNodeStatus status, Set<String>
deployedNodes) {
- if (status.status() == UPLOADING) {
- messaging.downloadUnitContent(status.id(), status.version(), new
ArrayList<>(deployedNodes))
- .thenCompose(content -> deployer.deploy(status.id(),
status.version(), content))
- .thenApply(deployed -> {
- if (deployed) {
- return deploymentUnitStore.updateNodeStatus(
- getLocalNodeId(),
- status.id(),
- status.version(),
- DEPLOYED);
- }
- return deployed;
- });
- } else if (status.status() == DEPLOYED) {
- deploymentUnitStore.getClusterStatus(status.id(), status.version())
- .thenApply(UnitClusterStatus::initialNodesToDeploy)
- .thenApply(deployedNodes::containsAll)
- .thenAccept(allRequiredDeployed -> {
- if (allRequiredDeployed) {
-
deploymentUnitStore.updateClusterStatus(status.id(), status.version(),
DEPLOYED);
- }
- });
- }
+ failover = new DeploymentUnitFailover(logicalTopology,
deploymentUnitStore, clusterService);
}
@Override
@@ -185,7 +165,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
LOG.error("Error reading deployment unit content", e);
return failedFuture(e);
}
- return tracker.track(id, version, deployToLocalNode(id, version,
unitContent)
+ return deployToLocalNode(id, version, unitContent)
.thenApply(completed -> {
if (completed) {
String localNodeId = getLocalNodeId();
@@ -196,8 +176,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
}));
}
return completed;
- })
- );
+ });
}
private CompletableFuture<Boolean> deployToLocalNode(String id, Version
version, UnitContent unitContent) {
@@ -322,12 +301,14 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
@Override
public void start() {
+ DefaultNodeCallback callback = new
DefaultNodeCallback(deploymentUnitStore, messaging, deployer, clusterService,
tracker);
deployer.initUnitsFolder(workDir.resolve(configuration.deploymentLocation().value()));
deploymentUnitStore.registerListener(new NodeStatusWatchListener(
deploymentUnitStore,
this::getLocalNodeId,
- this::onUnitRegister));
+ callback));
messaging.subscribe();
+ failover.registerTopologyChangeCallback(callback);
}
@Override
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
similarity index 68%
rename from
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
rename to
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
index f3b57ec44e..0bdc2f202c 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
@@ -20,19 +20,18 @@ package org.apache.ignite.internal.deployunit;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.deployunit.metastore.status.ClusterStatusKey;
-import org.apache.ignite.internal.future.InFlightFutures;
-import org.apache.ignite.lang.ByteArray;
/**
* Deploy actions tracker.
*/
-public class DeployTracker {
+public class DownloadTracker {
/**
* In flight futures tracker.
*/
- private final Map<ByteArray, InFlightFutures> inFlightFutures = new
ConcurrentHashMap<>();
+ private final Map<ClusterStatusKey, CompletableFuture<?>> inFlightFutures
= new ConcurrentHashMap<>();
/**
* Track deploy action.
@@ -43,9 +42,9 @@ public class DeployTracker {
* @param trackableAction Deploy action.
* @return {@param trackableAction}.
*/
- public <T> CompletableFuture<T> track(String id, Version version,
CompletableFuture<T> trackableAction) {
- ByteArray key =
ClusterStatusKey.builder().id(id).version(version).build().toByteArray();
- return inFlightFutures.computeIfAbsent(key, k -> new
InFlightFutures()).registerFuture(trackableAction);
+ public <T> CompletableFuture<T> track(String id, Version version,
Supplier<CompletableFuture<T>> trackableAction) {
+ ClusterStatusKey key =
ClusterStatusKey.builder().id(id).version(version).build();
+ return (CompletableFuture<T>) inFlightFutures.computeIfAbsent(key, k
-> trackableAction.get());
}
/**
@@ -54,10 +53,11 @@ public class DeployTracker {
* @param id Deployment unit identifier.
* @param version Deployment version identifier.
*/
- public void cancelIfDeploy(String id, Version version) {
- InFlightFutures futureTracker =
inFlightFutures.get(ClusterStatusKey.builder().id(id).version(version).build().toByteArray());
- if (futureTracker != null) {
- futureTracker.cancelInFlightFutures();
+ public void cancelIfDownloading(String id, Version version) {
+ ClusterStatusKey key =
ClusterStatusKey.builder().id(id).version(version).build();
+ CompletableFuture<?> future = inFlightFutures.remove(key);
+ if (future != null) {
+ future.cancel(true);
}
}
@@ -65,6 +65,6 @@ public class DeployTracker {
* Cancel all deploy actions.
*/
public void cancelAll() {
-
inFlightFutures.values().forEach(InFlightFutures::cancelInFlightFutures);
+ inFlightFutures.values().forEach(future -> future.cancel(true));
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
new file mode 100644
index 0000000000..6ed476baa5
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
+import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.UPLOADING;
+
+import java.util.Objects;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Deployment unit failover.
+ */
+public class DeploymentUnitFailover {
+ private final LogicalTopologyService logicalTopology;
+
+ private final DeploymentUnitStore deploymentUnitStore;
+
+ private final ClusterService clusterService;
+
+ /**
+ * Constructor.
+ *
+ * @param logicalTopology Logical topology service.
+ * @param deploymentUnitStore Deployment units store.
+ * @param clusterService Cluster service.
+ */
+ public DeploymentUnitFailover(
+ LogicalTopologyService logicalTopology,
+ DeploymentUnitStore deploymentUnitStore,
+ ClusterService clusterService
+ ) {
+ this.logicalTopology = logicalTopology;
+ this.deploymentUnitStore = deploymentUnitStore;
+ this.clusterService = clusterService;
+ }
+
+ /**
+ * Register {@link NodeEventCallback} as topology change callback.
+ *
+ * @param callback Node status callback.
+ */
+ public void registerTopologyChangeCallback(NodeEventCallback callback) {
+ logicalTopology.addEventListener(new LogicalTopologyEventListener() {
+ @Override
+ public void onNodeJoined(LogicalNode joinedNode,
LogicalTopologySnapshot newTopology) {
+ String consistentId = joinedNode.name();
+ if (Objects.equals(consistentId, getLocalNodeId())) {
+ deploymentUnitStore.getNodeStatuses(consistentId)
+ .thenAccept(nodeStatuses ->
nodeStatuses.forEach(nodeStatus -> {
+ if (nodeStatus.status() == UPLOADING) {
+
deploymentUnitStore.getClusterStatus(nodeStatus.id(), nodeStatus.version())
+ .thenAccept(clusterStatus -> {
+ if (clusterStatus.status() ==
UPLOADING || clusterStatus.status() == DEPLOYED) {
+
deploymentUnitStore.getAllNodes(nodeStatus.id(), nodeStatus.version())
+ .thenAccept(nodes
-> callback.onUploading(nodeStatus, nodes));
+ }
+ });
+ }
+ }));
+ }
+ }
+ });
+ }
+
+ private String getLocalNodeId() {
+ return clusterService.topologyService().localMember().name();
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
index 34cb08b8a5..e44731b88f 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
@@ -31,8 +31,6 @@ import
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus;
* Metastore for deployment units.
*/
public interface DeploymentUnitStore {
-
-
/**
* Register node statuses change events listener.
*
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
index e023af0c84..a74c31be0c 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeEventCallback.java
@@ -17,13 +17,12 @@
package org.apache.ignite.internal.deployunit.metastore;
-import java.util.Set;
+import java.util.List;
import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
/**
* Listener of deployment unit node status changes.
*/
-@FunctionalInterface
public interface NodeEventCallback {
/**
* Change event.
@@ -31,5 +30,38 @@ public interface NodeEventCallback {
* @param status Deployment unit status.
* @param holders Nodes consistent id.
*/
- void onUpdate(UnitNodeStatus status, Set<String> holders);
+ default void onUpdate(UnitNodeStatus status, List<String> holders) {
+ switch (status.status()) {
+ case UPLOADING:
+ onUploading(status, holders);
+ break;
+ case DEPLOYED:
+ onDeploy(status, holders);
+ break;
+ case REMOVING:
+ onRemoving(status, holders);
+ break;
+ case OBSOLETE:
+ onObsolete(status, holders);
+ break;
+ default:
+ break;
+ }
+ }
+
+ default void onUploading(UnitNodeStatus status, List<String> holders) {
+
+ }
+
+ default void onDeploy(UnitNodeStatus status, List<String> holders) {
+
+ }
+
+ default void onObsolete(UnitNodeStatus status, List<String> holders) {
+
+ }
+
+ default void onRemoving(UnitNodeStatus status, List<String> holders) {
+
+ }
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
index 1390467242..67a2d684f3 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.deployunit.metastore;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -45,7 +44,7 @@ public class NodeStatusWatchListener implements WatchListener
{
private final Supplier<String> localNodeProvider;
- private final NodeEventCallback listener;
+ private final NodeEventCallback callback;
private final ExecutorService executor = Executors.newFixedThreadPool(
4, new NamedThreadFactory("NodeStatusWatchListener-pool", LOG));
@@ -62,7 +61,7 @@ public class NodeStatusWatchListener implements WatchListener
{
NodeEventCallback callback) {
this.deploymentUnitStore = deploymentUnitStore;
this.localNodeProvider = localNodeProvider;
- this.listener = callback;
+ this.callback = callback;
}
@Override
@@ -84,7 +83,7 @@ public class NodeStatusWatchListener implements WatchListener
{
CompletableFuture.supplyAsync(() -> nodeStatus, executor)
.thenComposeAsync(status ->
deploymentUnitStore.getAllNodes(status.id(), status.version()), executor)
- .thenAccept(nodes -> listener.onUpdate(nodeStatus, new
HashSet<>(nodes)));
+ .thenAccept(nodes -> callback.onUpdate(nodeStatus, nodes));
}
return completedFuture(null);
}
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
index 51ed8200f2..5474c10613 100644
---
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
@@ -63,7 +63,12 @@ public class DeploymentUnitStoreImplTest {
private final List<UnitNodeStatus> history =
Collections.synchronizedList(new ArrayList<>());
- private final NodeEventCallback listener = (status, holders) ->
history.add(status);
+ private final NodeEventCallback listener = new NodeEventCallback() {
+ @Override
+ public void onUpdate(UnitNodeStatus status, List<String> holders) {
+ history.add(status);
+ }
+ };
private DeploymentUnitStoreImpl metastore;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
index 7ae97e511a..d7204c3c3f 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -119,35 +120,59 @@ public class Cluster {
this.defaultNodeBootstrapConfigTemplate =
defaultNodeBootstrapConfigTemplate;
}
+ public void startAndInit(int nodeCount) {
+ startAndInit(nodeCount, new int[] { 0 });
+ }
+
/**
* Starts the cluster with the given number of nodes and initializes it.
*
* @param nodeCount Number of nodes in the cluster.
+ * @param cmgNodes Indices of CMG nodes.
*/
- public void startAndInit(int nodeCount) {
- startAndInit(nodeCount, builder -> {});
+ public void startAndInit(int nodeCount, int[] cmgNodes) {
+ startAndInit(nodeCount, cmgNodes, builder -> {});
}
/**
* Starts the cluster with the given number of nodes and initializes it.
*
* @param nodeCount Number of nodes in the cluster.
+ * @param cmgNodes Indices of CMG nodes.
+ * @param initParametersConfigurator Configure {@link InitParameters}
before initializing the cluster.
+ */
+ public void startAndInit(int nodeCount, int[] cmgNodes,
Consumer<InitParametersBuilder> initParametersConfigurator) {
+ startAndInit(nodeCount, cmgNodes, defaultNodeBootstrapConfigTemplate,
initParametersConfigurator);
+ }
+
+ /**
+ * Starts the cluster with the given number of nodes and initializes it
with CMG on first node.
+ *
+ * @param nodeCount Number of nodes in the cluster.
+ * @param nodeBootstrapConfigTemplate Node bootstrap config template to be
used for each node started
+ * with this call.
* @param initParametersConfigurator Configure {@link InitParameters}
before initializing the cluster.
*/
- public void startAndInit(int nodeCount, Consumer<InitParametersBuilder>
initParametersConfigurator) {
- startAndInit(nodeCount, defaultNodeBootstrapConfigTemplate,
initParametersConfigurator);
+ public void startAndInit(
+ int nodeCount,
+ String nodeBootstrapConfigTemplate,
+ Consumer<InitParametersBuilder> initParametersConfigurator
+ ) {
+ startAndInit(nodeCount, new int[] { 0 }, nodeBootstrapConfigTemplate,
initParametersConfigurator);
}
/**
* Starts the cluster with the given number of nodes and initializes it.
*
* @param nodeCount Number of nodes in the cluster.
+ * @param cmgNodes Indices of CMG nodes.
* @param nodeBootstrapConfigTemplate Node bootstrap config template to be
used for each node started
* with this call.
* @param initParametersConfigurator Configure {@link InitParameters}
before initializing the cluster.
*/
public void startAndInit(
int nodeCount,
+ int[] cmgNodes,
String nodeBootstrapConfigTemplate,
Consumer<InitParametersBuilder> initParametersConfigurator
) {
@@ -159,11 +184,11 @@ public class Cluster {
.mapToObj(nodeIndex -> startNodeAsync(nodeIndex,
nodeBootstrapConfigTemplate))
.collect(toList());
- String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
+ List<String> metaStorageAndCmgNodeNames =
Arrays.stream(cmgNodes).mapToObj(i -> testNodeName(testInfo,
i)).collect(toList());
InitParametersBuilder builder = InitParameters.builder()
- .destinationNodeName(metaStorageAndCmgNodeName)
- .metaStorageNodeNames(List.of(metaStorageAndCmgNodeName))
+ .destinationNodeName(metaStorageAndCmgNodeNames.get(0))
+ .metaStorageNodeNames(metaStorageAndCmgNodeNames)
.clusterName("cluster");
initParametersConfigurator.accept(builder);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 1f0132e21c..82cd58351f 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -108,7 +108,7 @@ public abstract class ClusterPerTestIntegrationTest extends
IgniteIntegrationTes
cluster = new Cluster(testInfo, workDir,
getNodeBootstrapConfigTemplate());
if (initialNodes() > 0) {
- cluster.startAndInit(initialNodes());
+ cluster.startAndInit(initialNodes(), cmgMetastoreNodes());
}
}
@@ -135,6 +135,10 @@ public abstract class ClusterPerTestIntegrationTest
extends IgniteIntegrationTes
return 3;
}
+ protected int[] cmgMetastoreNodes() {
+ return new int[] { 0 };
+ }
+
/**
* Returns node bootstrap config template.
*
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
new file mode 100644
index 0000000000..5ba85fb712
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFile.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployment;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+class DeployFile {
+ private final Path file;
+
+ private final long expectedSize;
+
+ private final int replicaTimeout;
+
+ DeployFile(Path file, long expectedSize, int replicaTimeout) throws
IOException {
+ this.file = file;
+ this.expectedSize = expectedSize;
+ this.replicaTimeout = replicaTimeout;
+ ensureExists();
+ }
+
+ private void ensureExists() throws IOException {
+ if (!Files.exists(file)) {
+ IgniteUtils.fillDummyFile(file, expectedSize);
+ }
+ }
+
+ Path file() {
+ return file;
+ }
+
+ long expectedSize() {
+ return expectedSize;
+ }
+
+ int replicaTimeout() {
+ return replicaTimeout;
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
new file mode 100644
index 0000000000..6abeb4bb6d
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/DeployFiles.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployment;
+
+import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.deployunit.DeploymentUnit;
+import org.apache.ignite.internal.deployunit.UnitStatuses;
+import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder;
+
+class DeployFiles {
+ private static final int BASE_REPLICA_TIMEOUT = 30;
+
+ private static final long SMALL_IN_BYTES = 1024L;
+
+ private static final long MEDIUM_IN_BYTES = 1024L * 1024L;
+
+ private static final long BIG_IN_BYTES = 100 * 1024L * 1024L;
+
+ private final Path workDir;
+
+ private DeployFile smallFile;
+
+ private DeployFile mediumFile;
+
+ private DeployFile bigFile;
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-19009
+ DeployFiles(Path workDir) {
+ this.workDir = workDir;
+ }
+
+ private static DeployFile create(Path path, long size, int replicaTimeout)
{
+ try {
+ return new DeployFile(path, size, replicaTimeout);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ DeployFile smallFile() {
+ if (smallFile == null) {
+ smallFile = create(workDir.resolve("small.txt"), SMALL_IN_BYTES,
BASE_REPLICA_TIMEOUT);
+ }
+ return smallFile;
+ }
+
+ DeployFile mediumFile() {
+ if (mediumFile == null) {
+ mediumFile = create(workDir.resolve("medium.txt"),
MEDIUM_IN_BYTES, BASE_REPLICA_TIMEOUT * 2);
+ }
+ return mediumFile;
+ }
+
+ DeployFile bigFile() {
+ if (bigFile == null) {
+ bigFile = create(workDir.resolve("big.txt"), BIG_IN_BYTES,
BASE_REPLICA_TIMEOUT * 3);
+ }
+ return bigFile;
+ }
+
+ private Unit deployAndVerify(String id, Version version, DeployFile file,
IgniteImpl entryNode) {
+ return deployAndVerify(id, version, false, file, entryNode);
+ }
+
+ private Unit deployAndVerify(String id, Version version, boolean force,
DeployFile file, IgniteImpl entryNode) {
+ return deployAndVerify(id, version, force, List.of(file), entryNode);
+ }
+
+ public Unit deployAndVerify(String id, Version version, boolean force,
List<DeployFile> files, IgniteImpl entryNode) {
+ List<Path> paths = files.stream()
+ .map(DeployFile::file)
+ .collect(Collectors.toList());
+
+ CompletableFuture<Boolean> deploy = entryNode.deployment()
+ .deployAsync(id, version, force, fromPaths(paths));
+
+ assertThat(deploy, willBe(true));
+
+ Unit unit = new Unit(entryNode, workDir, id, version, files);
+
+ Path nodeUnitDirectory = unit.getNodeUnitDirectory(entryNode);
+
+ for (DeployFile file : files) {
+ Path filePath =
nodeUnitDirectory.resolve(file.file().getFileName());
+ assertTrue(Files.exists(filePath));
+ }
+
+ return unit;
+ }
+
+ public Unit deployAndVerifySmall(String id, Version version, IgniteImpl
entryNode) {
+ return deployAndVerify(id, version, smallFile(), entryNode);
+ }
+
+ public Unit deployAndVerifyMedium(String id, Version version, IgniteImpl
entryNode) {
+ return deployAndVerify(id, version, mediumFile(), entryNode);
+ }
+
+ public Unit deployAndVerifyBig(String id, Version version, IgniteImpl
entryNode) {
+ return deployAndVerify(id, version, bigFile(), entryNode);
+ }
+
+ public static UnitStatuses buildStatus(String id, Unit... units) {
+ UnitStatusesBuilder builder = UnitStatuses.builder(id);
+ for (Unit unit : units) {
+ builder.append(unit.version(), DEPLOYED);
+ }
+
+ return builder.build();
+ }
+
+ private static DeploymentUnit fromPaths(List<Path> paths) {
+ Objects.requireNonNull(paths);
+ Map<String, InputStream> map = new HashMap<>();
+ try {
+ for (Path path : paths) {
+ map.put(path.getFileName().toString(),
Files.newInputStream(path));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return () -> map;
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
new file mode 100644
index 0000000000..7275899667
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitFailoverTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployment;
+
+import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.deployunit.IgniteDeployment;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for {@link IgniteDeployment} for recovery logic.
+ */
+public class ItDeploymentUnitFailoverTest extends
ClusterPerTestIntegrationTest {
+ private DeployFiles files;
+
+ @BeforeEach
+ public void generateDummy() {
+ files = new DeployFiles(workDir);
+ }
+
+ @Override
+ protected int initialNodes() {
+ return 9;
+ }
+
+ @Override
+ protected int[] cmgMetastoreNodes() {
+ return new int[] {6, 7, 8};
+ }
+
+ @Test
+ public void testDeployWithNodeStop() {
+ IgniteImpl cmgNode = cluster.node(8);
+ Unit big = files.deployAndVerifyBig("id1",
Version.parseVersion("1.0.0"), cluster.node(3));
+
+ stopNode(8);
+
+ big.waitUnitClean(cmgNode);
+ big.waitUnitReplica(cluster.node(6));
+ big.waitUnitReplica(cluster.node(7));
+
+ cmgNode = startNode(8);
+ big.waitUnitReplica(cmgNode);
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index b1251c2d6e..6931cebc11 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.deployment;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.deployment.DeployFiles.buildStatus;
import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.DEPLOYED;
import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.UPLOADING;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -26,29 +27,16 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.nullValue;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.deployunit.DeploymentUnit;
import org.apache.ignite.internal.deployunit.IgniteDeployment;
import org.apache.ignite.internal.deployunit.UnitStatuses;
-import org.apache.ignite.internal.deployunit.UnitStatuses.UnitStatusesBuilder;
-import
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
import org.apache.ignite.internal.rest.api.deployment.DeploymentStatus;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -56,44 +44,21 @@ import org.junit.jupiter.api.Test;
* Integration tests for {@link IgniteDeployment}.
*/
public class ItDeploymentUnitTest extends ClusterPerTestIntegrationTest {
- private static final int BASE_REPLICA_TIMEOUT = 30;
+ private DeployFiles files;
- private static final long SMALL_IN_BYTES = 1024L;
-
- private static final long MEDIUM_IN_BYTES = 1024L * 1024L;
-
- private static final long BIG_IN_BYTES = 1024L * 1024L * 1024L;
-
- private DeployFile smallFile;
-
- private DeployFile mediumFile;
-
- private DeployFile bigFile;
-
- private List<DeployFile> allFiles;
@BeforeEach
- public void generateDummy() throws IOException {
- smallFile = create("small.txt", SMALL_IN_BYTES, BASE_REPLICA_TIMEOUT);
- mediumFile = create("medium.txt", MEDIUM_IN_BYTES,
BASE_REPLICA_TIMEOUT * 2);
- // TODO https://issues.apache.org/jira/browse/IGNITE-19009
- // bigFile = create("big.txt", BIG_IN_BYTES, BASE_REPLICA_TIMEOUT * 3);
- allFiles = List.of(smallFile, mediumFile);
- }
-
- private DeployFile create(String name, long size, int replicaTimeout)
throws IOException {
- DeployFile deployFile = new DeployFile(workDir.resolve(name), size,
replicaTimeout);
- deployFile.ensureExists();
- return deployFile;
+ public void generateDummy() {
+ files = new DeployFiles(workDir);
}
@Test
public void testDeploy() {
String id = "test";
- Unit unit = deployAndVerifySmall(id, Version.parseVersion("1.1.0"), 1);
+ Unit unit = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.0"), cluster.node(1));
IgniteImpl cmg = cluster.node(0);
- waitUnitReplica(cmg, unit);
+ unit.waitUnitReplica(cmg);
UnitStatuses status = buildStatus(id, unit);
@@ -105,10 +70,15 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
@Test
public void deployDirectory() {
String id = "test";
- Unit unit = deployAndVerify(id, Version.parseVersion("1.1.0"), false,
allFiles, 1);
+ Unit unit = files.deployAndVerify(id,
+ Version.parseVersion("1.1.0"),
+ false,
+ List.of(files.smallFile(), files.mediumFile()),
+ cluster.node(1)
+ );
IgniteImpl cmg = cluster.node(0);
- waitUnitReplica(cmg, unit);
+ unit.waitUnitReplica(cmg);
UnitStatuses status = buildStatus(id, unit);
@@ -119,13 +89,13 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
@Test
public void testDeployUndeploy() {
- Unit unit = deployAndVerifySmall("test",
Version.parseVersion("1.1.0"), 1);
+ Unit unit = files.deployAndVerifySmall("test",
Version.parseVersion("1.1.0"), cluster.node(1));
IgniteImpl cmg = cluster.node(0);
- waitUnitReplica(cmg, unit);
+ unit.waitUnitReplica(cmg);
unit.undeploy();
- waitUnitClean(cmg, unit);
+ unit.waitUnitClean(cmg);
CompletableFuture<List<UnitStatuses>> list =
node(2).deployment().clusterStatusesAsync();
assertThat(list, willBe(Collections.emptyList()));
@@ -134,12 +104,12 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
@Test
public void testDeployTwoUnits() {
String id = "test";
- Unit unit1 = deployAndVerifySmall(id, Version.parseVersion("1.1.0"),
1);
- Unit unit2 = deployAndVerifySmall(id, Version.parseVersion("1.1.1"),
2);
+ Unit unit1 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.0"), cluster.node(1));
+ Unit unit2 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.1"), cluster.node(2));
IgniteImpl cmg = cluster.node(0);
- waitUnitReplica(cmg, unit1);
- waitUnitReplica(cmg, unit2);
+ unit1.waitUnitReplica(cmg);
+ unit2.waitUnitReplica(cmg);
UnitStatuses status = buildStatus(id, unit1, unit2);
@@ -147,19 +117,19 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
.pollDelay(100, MILLISECONDS)
.until(() -> node(2).deployment().clusterStatusesAsync(id),
willBe(status));
- CompletableFuture<List<Version>> versions =
node(2).deployment().versionsAsync(unit1.id);
- assertThat(versions, willBe(List.of(unit1.version, unit2.version)));
+ CompletableFuture<List<Version>> versions =
node(2).deployment().versionsAsync(unit1.id());
+ assertThat(versions, willBe(List.of(unit1.version(),
unit2.version())));
}
@Test
public void testDeployTwoUnitsAndUndeployOne() {
String id = "test";
- Unit unit1 = deployAndVerifySmall(id, Version.parseVersion("1.1.0"),
1);
- Unit unit2 = deployAndVerifySmall(id, Version.parseVersion("1.1.1"),
2);
+ Unit unit1 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.0"), cluster.node(1));
+ Unit unit2 = files.deployAndVerifySmall(id,
Version.parseVersion("1.1.1"), cluster.node(2));
IgniteImpl cmg = cluster.node(0);
- waitUnitReplica(cmg, unit1);
- waitUnitReplica(cmg, unit2);
+ unit1.waitUnitReplica(cmg);
+ unit2.waitUnitReplica(cmg);
UnitStatuses status = buildStatus(id, unit1, unit2);
@@ -168,21 +138,21 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
.until(() -> node(2).deployment().clusterStatusesAsync(id),
willBe(status));
unit2.undeploy();
- CompletableFuture<List<Version>> newVersions =
node(2).deployment().versionsAsync(unit1.id);
- assertThat(newVersions, willBe(List.of(unit1.version)));
+ CompletableFuture<List<Version>> newVersions =
node(2).deployment().versionsAsync(unit1.id());
+ assertThat(newVersions, willBe(List.of(unit1.version())));
}
@Test
public void testDeploymentStatus() {
String id = "test";
Version version = Version.parseVersion("1.1.0");
- Unit unit = deployAndVerifyMedium(id, version, 1);
+ Unit unit = files.deployAndVerifyMedium(id, version, cluster.node(1));
CompletableFuture<DeploymentStatus> status =
node(2).deployment().clusterStatusAsync(id, version);
assertThat(status, willBe(UPLOADING));
IgniteImpl cmg = cluster.node(0);
- waitUnitReplica(cmg, unit);
+ unit.waitUnitReplica(cmg);
await().timeout(2, SECONDS)
.pollDelay(300, MILLISECONDS)
@@ -190,8 +160,8 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
assertThat(unit.undeployAsync(), willSucceedFast());
- waitUnitClean(unit.deployedNode, unit);
- waitUnitClean(cmg, unit);
+ unit.waitUnitClean(unit.deployedNode());
+ unit.waitUnitClean(cmg);
assertThat(node(2).deployment().clusterStatusAsync(id, version),
willBe(nullValue()));
}
@@ -200,223 +170,59 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
public void testRedeploy() {
String id = "test";
String version = "1.1.0";
- Unit smallUnit = deployAndVerify(id, Version.parseVersion(version),
smallFile, 1);
+ Unit smallUnit = files.deployAndVerifySmall(id,
Version.parseVersion(version), cluster.node(1));
IgniteImpl cmg = cluster.node(0);
- waitUnitReplica(cmg, smallUnit);
+ smallUnit.waitUnitReplica(cmg);
- Unit mediumUnit = deployAndVerify(id, Version.parseVersion(version),
true, mediumFile, 1);
- waitUnitReplica(cmg, mediumUnit);
+ Unit mediumUnit = files.deployAndVerify(id,
Version.parseVersion(version), true, List.of(files.mediumFile()),
cluster.node(1));
+ mediumUnit.waitUnitReplica(cmg);
- waitUnitClean(smallUnit.deployedNode, smallUnit);
- waitUnitClean(cmg, smallUnit);
+ smallUnit.waitUnitClean(smallUnit.deployedNode());
+ smallUnit.waitUnitClean(cmg);
}
@Test
public void testOnDemandDeploy() {
String id = "test";
Version version = Version.parseVersion("1.1.0");
- Unit smallUnit = deployAndVerify(id, version, smallFile, 1);
+ Unit smallUnit = files.deployAndVerifySmall(id, version,
cluster.node(1));
IgniteImpl cmg = cluster.node(0);
- waitUnitReplica(cmg, smallUnit);
+ smallUnit.waitUnitReplica(cmg);
IgniteImpl onDemandDeployNode = cluster.node(2);
CompletableFuture<Boolean> onDemandDeploy =
onDemandDeployNode.deployment().onDemandDeploy(id, version);
assertThat(onDemandDeploy, willBe(true));
- waitUnitReplica(onDemandDeployNode, smallUnit);
+ smallUnit.waitUnitReplica(onDemandDeployNode);
}
@Test
public void testOnDemandDeployToDeployedNode() {
String id = "test";
Version version = Version.parseVersion("1.1.0");
- Unit smallUnit = deployAndVerify(id, version, smallFile, 1);
+ Unit smallUnit = files.deployAndVerifySmall(id, version,
cluster.node(1));
IgniteImpl cmg = cluster.node(0);
- waitUnitReplica(cmg, smallUnit);
+ smallUnit.waitUnitReplica(cmg);
IgniteImpl onDemandDeployNode = cluster.node(1);
CompletableFuture<Boolean> onDemandDeploy =
onDemandDeployNode.deployment().onDemandDeploy(id, version);
assertThat(onDemandDeploy, willBe(true));
- waitUnitReplica(onDemandDeployNode, smallUnit);
+ smallUnit.waitUnitReplica(onDemandDeployNode);
}
@Test
public void testDeployToCmg() {
String id = "test";
Version version = Version.parseVersion("1.1.0");
- Unit smallUnit = deployAndVerify(id, version, smallFile, 0);
+ Unit smallUnit = files.deployAndVerifySmall(id, version,
cluster.node(0));
await().untilAsserted(() -> {
CompletableFuture<List<UnitStatuses>> list =
node(0).deployment().clusterStatusesAsync();
assertThat(list,
willBe(List.of(UnitStatuses.builder(id).append(version, DEPLOYED).build())));
});
}
-
- private UnitStatuses buildStatus(String id, Unit... units) {
- UnitStatusesBuilder builder = UnitStatuses.builder(id);
- for (Unit unit : units) {
- builder.append(unit.version, DEPLOYED);
- }
-
- return builder.build();
- }
-
- private Unit deployAndVerify(String id, Version version, DeployFile file,
int nodeIndex) {
- return deployAndVerify(id, version, false, file, nodeIndex);
- }
-
- private Unit deployAndVerify(String id, Version version, boolean force,
DeployFile file, int nodeIndex) {
- return deployAndVerify(id, version, force, List.of(file), nodeIndex);
- }
-
- private Unit deployAndVerify(String id, Version version, boolean force,
List<DeployFile> files, int nodeIndex) {
- IgniteImpl entryNode = node(nodeIndex);
-
- List<Path> paths = files.stream()
- .map(deployFile -> deployFile.file)
- .collect(Collectors.toList());
-
- CompletableFuture<Boolean> deploy = entryNode.deployment()
- .deployAsync(id, version, force, fromPaths(paths));
-
- assertThat(deploy, willBe(true));
-
- Unit unit = new Unit(entryNode, id, version, files);
-
- Path nodeUnitDirectory = getNodeUnitDirectory(entryNode, id, version);
-
- for (DeployFile file : files) {
- Path filePath = nodeUnitDirectory.resolve(file.file.getFileName());
- assertTrue(Files.exists(filePath));
- }
-
- return unit;
- }
-
- private Unit deployAndVerifySmall(String id, Version version, int
nodeIndex) {
- return deployAndVerify(id, version, smallFile, nodeIndex);
- }
-
- private Unit deployAndVerifyMedium(String id, Version version, int
nodeIndex) {
- return deployAndVerify(id, version, mediumFile, nodeIndex);
- }
-
- private Unit deployAndVerifyBig(String id, Version version, int nodeIndex)
{
- return deployAndVerify(id, version, bigFile, nodeIndex);
- }
-
- private Path getNodeUnitDirectory(IgniteImpl node, String unitId, Version
unitVersion) {
- String deploymentFolder = node.nodeConfiguration()
- .getConfiguration(DeploymentConfiguration.KEY)
- .deploymentLocation().value();
- Path resolve = workDir.resolve(node.name()).resolve(deploymentFolder);
- return resolve.resolve(unitId)
- .resolve(unitVersion.render());
- }
-
- private void waitUnitReplica(IgniteImpl ignite, Unit unit) {
- Path unitDirectory = getNodeUnitDirectory(ignite, unit.id,
unit.version);
-
- int combinedTimeout = unit.files.stream().map(file ->
file.replicaTimeout).reduce(Integer::sum).get();
-
- await().timeout(combinedTimeout, SECONDS)
- .pollDelay(1, SECONDS)
- .ignoreException(IOException.class)
- .until(() -> {
- for (DeployFile file : unit.files) {
- Path filePath =
unitDirectory.resolve(file.file.getFileName());
- if (Files.notExists(filePath) || Files.size(filePath)
!= file.expectedSize) {
- return false;
- }
- }
-
- return true;
- });
- }
-
- private void waitUnitClean(IgniteImpl ignite, Unit unit) {
- Path unitDirectory = getNodeUnitDirectory(ignite, unit.id,
unit.version);
-
- int combinedTimeout = unit.files.stream().map(file ->
file.replicaTimeout).reduce(Integer::sum).get();
-
- await().timeout(combinedTimeout, SECONDS)
- .pollDelay(2, SECONDS)
- .until(() -> {
- for (DeployFile file : unit.files) {
- Path filePath =
unitDirectory.resolve(file.file.getFileName());
- if (Files.exists(filePath)) {
- return false;
- }
- }
-
- return true;
- });
- }
-
- class Unit {
- private final IgniteImpl deployedNode;
-
- private final String id;
-
- private final Version version;
-
- private final List<DeployFile> files;
-
- Unit(IgniteImpl deployedNode, String id, Version version,
List<DeployFile> files) {
- this.deployedNode = deployedNode;
- this.id = id;
- this.version = version;
- this.files = files;
- }
-
- CompletableFuture<Boolean> undeployAsync() {
- return deployedNode.deployment().undeployAsync(id, version);
- }
-
- void undeploy() {
- deployedNode.deployment().undeployAsync(id, version);
- waitUnitClean(deployedNode, this);
- }
- }
-
- private static class DeployFile {
- private final Path file;
-
- private final long expectedSize;
-
- private final int replicaTimeout;
-
- private DeployFile(Path file, long expectedSize, int replicaTimeout) {
- this.file = file;
- this.expectedSize = expectedSize;
- this.replicaTimeout = replicaTimeout;
- }
-
- public void ensureExists() throws IOException {
- ensureFile(file, expectedSize);
- }
-
- private static void ensureFile(Path path, long size) throws
IOException {
- if (!Files.exists(path)) {
- IgniteUtils.fillDummyFile(path, size);
- }
- }
- }
-
- private static DeploymentUnit fromPaths(List<Path> paths) {
- Objects.requireNonNull(paths);
- Map<String, InputStream> map = new HashMap<>();
- try {
- for (Path path : paths) {
- map.put(path.getFileName().toString(),
Files.newInputStream(path));
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return () -> map;
- }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/Unit.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/Unit.java
new file mode 100644
index 0000000000..9253c27486
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/Unit.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployment;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
+
+class Unit {
+ private final IgniteImpl deployedNode;
+
+ private final String id;
+
+ private final Version version;
+
+ private final List<DeployFile> files;
+
+ private final Path workDir;
+
+ Unit(IgniteImpl deployedNode, Path workDir, String id, Version version,
List<DeployFile> files) {
+ this.deployedNode = deployedNode;
+ this.workDir = workDir;
+ this.id = id;
+ this.version = version;
+ this.files = files;
+ }
+
+ public String id() {
+ return id;
+ }
+
+ public Version version() {
+ return version;
+ }
+
+ public List<DeployFile> files() {
+ return files;
+ }
+
+ IgniteImpl deployedNode() {
+ return deployedNode;
+ }
+
+ CompletableFuture<Boolean> undeployAsync() {
+ return deployedNode.deployment().undeployAsync(id, version);
+ }
+
+ void undeploy() {
+ deployedNode.deployment().undeployAsync(id, version);
+ waitUnitClean(deployedNode);
+ }
+
+ void waitUnitClean(IgniteImpl ignite) {
+ Path unitDirectory = getNodeUnitDirectory(ignite);
+
+ int combinedTimeout =
files.stream().map(DeployFile::replicaTimeout).reduce(Integer::sum).get();
+
+ await().timeout(combinedTimeout, SECONDS)
+ .pollDelay(2, SECONDS)
+ .until(() -> {
+ for (DeployFile file : files) {
+ Path filePath =
unitDirectory.resolve(file.file().getFileName());
+ if (Files.exists(filePath)) {
+ return false;
+ }
+ }
+
+ return true;
+ });
+ }
+
+ Path getNodeUnitDirectory(IgniteImpl ignite) {
+ String deploymentFolder = ignite.nodeConfiguration()
+ .getConfiguration(DeploymentConfiguration.KEY)
+ .deploymentLocation().value();
+ return workDir
+ .resolve(ignite.name())
+ .resolve(deploymentFolder)
+ .resolve(id)
+ .resolve(version.render());
+ }
+
+ public Path getNodeUnitDirectory() {
+ return getNodeUnitDirectory(deployedNode);
+ }
+
+ void waitUnitReplica(IgniteImpl ignite) {
+ Path unitDirectory = getNodeUnitDirectory(ignite);
+
+ int combinedTimeout =
files.stream().map(DeployFile::replicaTimeout).reduce(Integer::sum).get();
+
+ await().timeout(combinedTimeout, SECONDS)
+ .pollDelay(1, SECONDS)
+ .ignoreException(IOException.class)
+ .until(() -> {
+ for (DeployFile file : files) {
+ Path filePath =
unitDirectory.resolve(file.file().getFileName());
+ if (Files.notExists(filePath) || Files.size(filePath)
!= file.expectedSize()) {
+ return false;
+ }
+ }
+
+ return true;
+ });
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
index 3bc2568282..884629ec79 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java
@@ -70,7 +70,7 @@ class ItJdbcTest extends IgniteIntegrationTest {
@BeforeAll
void setUp(TestInfo testInfo, @WorkDirectory Path workDir) {
cluster = new Cluster(testInfo, workDir);
- cluster.startAndInit(1, builder -> builder.clusterConfiguration(
+ cluster.startAndInit(1, new int[]{ 0 }, builder ->
builder.clusterConfiguration(
"{\n"
+ " \"security\": {\n"
+ " \"authentication\": {\n"
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index d937915332..74448f0f8f 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -531,6 +531,7 @@ public class IgniteImpl implements Ignite {
deploymentManager = new DeploymentManagerImpl(
clusterSvc,
metaStorageMgr,
+ logicalTopologyService,
workDir,
nodeConfigRegistry.getConfiguration(DeploymentConfiguration.KEY),
cmgMgr