This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bde0357dd0 IGNITE-18973 Introduce unit statuses (#1809)
bde0357dd0 is described below
commit bde0357dd0f72cea773e4f3663fe3ca47df51de5
Author: Mikhail <[email protected]>
AuthorDate: Fri Mar 24 17:09:05 2023 +0300
IGNITE-18973 Introduce unit statuses (#1809)
---
.../apache/ignite/deployment/DeploymentInfo.java | 115 +++++++++
.../ignite/deployment/DeploymentStatus.java} | 29 ++-
.../apache/ignite/deployment/IgniteDeployment.java | 11 -
.../org/apache/ignite/deployment/UnitStatus.java | 36 +--
.../deployunit/DeployMessagingService.java | 232 ++++++++++++++++++
.../deployunit/DeployMetastoreService.java | 133 ++++++++++
.../ignite/internal/deployunit/DeployTracker.java | 70 ++++++
.../internal/deployunit/DeploymentManagerImpl.java | 244 +++++--------------
.../internal/deployunit/FileDeployerService.java | 113 +++++++++
.../ignite/internal/deployunit/UnitMeta.java | 32 ++-
.../ignite/internal/deployunit/key/UnitKey.java | 5 +-
.../deployunit/key/UnitMetaSerializer.java | 8 +-
.../deployunit/message/DeployUnitMessageTypes.java | 4 +
.../deployunit/message/DeployUnitResponse.java | 8 +-
...loyUnitResponse.java => StopDeployRequest.java} | 21 +-
...oyUnitResponse.java => StopDeployResponse.java} | 16 +-
.../deployunit/message/UndeployUnitResponse.java | 8 -
.../deployunit/metastore/ListAccumulator.java | 58 +++++
.../metastore/UnitStatusAccumulator.java | 9 +-
.../deployunit/metastore/UnitsAccumulator.java | 7 +-
.../ignite/deployment/UnitMetaSerializerTest.java | 11 +-
.../ignite/internal/future/InFlightFutures.java | 3 +-
.../deployment/DeploymentManagementController.java | 2 +-
modules/runner/build.gradle | 1 +
.../internal/deployment/ItDeploymentUnitTest.java | 269 +++++++++++++++------
25 files changed, 1104 insertions(+), 341 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentInfo.java
b/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentInfo.java
new file mode 100644
index 0000000000..68b909285e
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentInfo.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.deployment;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Data class with deployment unit information.
+ */
+public class DeploymentInfo {
+ private final DeploymentStatus status;
+
+ private final List<String> consistentIds;
+
+ private DeploymentInfo(DeploymentStatus status, List<String>
consistentIds) {
+ this.status = status;
+ this.consistentIds = Collections.unmodifiableList(consistentIds);
+ }
+
+ public DeploymentStatus status() {
+ return status;
+ }
+
+ public List<String> consistentIds() {
+ return consistentIds;
+ }
+
+ public static DeploymentInfoBuilder builder() {
+ return new DeploymentInfoBuilder();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DeploymentInfo that = (DeploymentInfo) o;
+
+ return status == that.status
+ && (consistentIds != null ?
consistentIds.equals(that.consistentIds) : that.consistentIds == null);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = status != null ? status.hashCode() : 0;
+ result = 31 * result + (consistentIds != null ?
consistentIds.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "DeploymentInfo{"
+ + "status=" + status
+ + ", consistentIds=[" + String.join(", ", consistentIds) + "]"
+ + '}';
+ }
+
+ /**
+ * Builder for {@link DeploymentInfo}.
+ */
+ public static final class DeploymentInfoBuilder {
+ private DeploymentStatus status;
+ private final List<String> consistentIds = new ArrayList<>();
+
+ public DeploymentInfoBuilder status(DeploymentStatus status) {
+ this.status = status;
+ return this;
+ }
+
+ public DeploymentInfoBuilder addConsistentId(String consistentId) {
+ consistentIds.add(consistentId);
+ return this;
+ }
+
+ public DeploymentInfoBuilder addConsistentIds(Collection<String>
consistentIds) {
+ this.consistentIds.addAll(consistentIds);
+ return this;
+ }
+
+ /**
+ * Build {@link DeploymentInfo} instance.
+ *
+ * @return {@link DeploymentInfo} instance.
+ */
+ public DeploymentInfo build() {
+ Objects.requireNonNull(status);
+
+ return new DeploymentInfo(status, consistentIds);
+ }
+
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
b/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentStatus.java
similarity index 60%
copy from
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
copy to
modules/api/src/main/java/org/apache/ignite/deployment/DeploymentStatus.java
index ee298c8a1d..41cf27d845 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
+++
b/modules/api/src/main/java/org/apache/ignite/deployment/DeploymentStatus.java
@@ -15,22 +15,21 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.deployunit.message;
-
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
-import org.apache.ignite.network.annotations.Transferable;
+package org.apache.ignite.deployment;
/**
- * Deploy unit response.
+ * Status of deployment process.
*/
-@Transferable(DeployUnitMessageTypes.DEPLOY_UNIT_RESPONSE)
-public interface DeployUnitResponse extends NetworkMessage {
- /**
- * Returns error which happens on deploy process.
- *
- * @return error which happens on deploy process.
- */
- @Marshallable
- Throwable error();
+public enum DeploymentStatus {
+ /** Unit deployment is in progress. */
+ UPLOADING,
+
+ /** Unit is deployed on the cluster. */
+ DEPLOYED,
+
+ /** Remove command was initiated for the unit and it will be removed soon.
*/
+ OBSOLETE,
+
+ /** Unit removal from the cluster is in progress. */
+ REMOVING
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
b/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
index d74ccf8db3..63514b091a 100644
---
a/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
+++
b/modules/api/src/main/java/org/apache/ignite/deployment/IgniteDeployment.java
@@ -25,17 +25,6 @@ import org.apache.ignite.deployment.version.Version;
* Provides access to the Deployment Unit functionality.
*/
public interface IgniteDeployment {
- /**
- * Deploy provided unit to current node with latest version.
- *
- * @param id Unit identifier. Not empty and not null.
- * @param deploymentUnit Unit content.
- * @return Future with success or not result.
- */
- default CompletableFuture<Boolean> deployAsync(String id, DeploymentUnit
deploymentUnit) {
- return deployAsync(id, Version.LATEST, deploymentUnit);
- }
-
/**
* Deploy provided unit to current node.
* After deploy finished, this deployment unit will be place to CMG group
asynchronously.
diff --git
a/modules/api/src/main/java/org/apache/ignite/deployment/UnitStatus.java
b/modules/api/src/main/java/org/apache/ignite/deployment/UnitStatus.java
index 771a0fd8db..478d67a220 100644
--- a/modules/api/src/main/java/org/apache/ignite/deployment/UnitStatus.java
+++ b/modules/api/src/main/java/org/apache/ignite/deployment/UnitStatus.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.ignite.deployment.version.Version;
-import org.apache.ignite.internal.tostring.S;
/**
* Deployment unit status.
@@ -38,7 +37,7 @@ public class UnitStatus {
/**
* Map from existing unit version to list of nodes consistent ids where
unit deployed.
*/
- private final Map<Version, List<String>> versionToConsistentIds;
+ private final Map<Version, DeploymentInfo> versionToDeploymentInfo;
/**
* Constructor.
@@ -47,9 +46,10 @@ public class UnitStatus {
* @param versionToConsistentIds Map from existing unit version to list
* of nodes consistent ids where unit deployed.
*/
- private UnitStatus(String id, Map<Version, List<String>>
versionToConsistentIds) {
+ private UnitStatus(String id,
+ Map<Version, DeploymentInfo> versionToConsistentIds) {
this.id = id;
- this.versionToConsistentIds =
Collections.unmodifiableMap(versionToConsistentIds);
+ this.versionToDeploymentInfo =
Collections.unmodifiableMap(versionToConsistentIds);
}
/**
@@ -67,7 +67,7 @@ public class UnitStatus {
* @return unit version.
*/
public Set<Version> versions() {
- return Collections.unmodifiableSet(versionToConsistentIds.keySet());
+ return Collections.unmodifiableSet(versionToDeploymentInfo.keySet());
}
/**
@@ -77,7 +77,11 @@ public class UnitStatus {
* @return consistent ids of nodes for provided version.
*/
public List<String> consistentIds(Version version) {
- return
Collections.unmodifiableList(versionToConsistentIds.get(version));
+ return
Collections.unmodifiableList(versionToDeploymentInfo.get(version).consistentIds());
+ }
+
+ public DeploymentStatus status(Version version) {
+ return versionToDeploymentInfo.get(version).status();
}
/**
@@ -102,19 +106,21 @@ public class UnitStatus {
return false;
}
UnitStatus that = (UnitStatus) o;
- return Objects.equals(id, that.id) &&
Objects.equals(versionToConsistentIds, that.versionToConsistentIds);
+ return Objects.equals(id, that.id) &&
Objects.equals(versionToDeploymentInfo, that.versionToDeploymentInfo);
}
/** {@inheritDoc} */
@Override
public int hashCode() {
- return Objects.hash(id, versionToConsistentIds);
+ return Objects.hash(id, versionToDeploymentInfo);
}
- /** {@inheritDoc} */
@Override
public String toString() {
- return S.toString(this);
+ return "UnitStatus{"
+ + "id='" + id + '\''
+ + ", versionToDeploymentInfo=" + versionToDeploymentInfo
+ + '}';
}
/**
@@ -123,7 +129,7 @@ public class UnitStatus {
public static class UnitStatusBuilder {
private final String id;
- private final Map<Version, List<String>> versionToConsistentIds = new
HashMap<>();
+ private final Map<Version, DeploymentInfo> versionToInfo = new
HashMap<>();
/**
* Constructor.
@@ -138,11 +144,11 @@ public class UnitStatus {
* Append node consistent ids with provided version.
*
* @param version Unit version.
- * @param consistentIds Node consistent ids.
+ * @param deploymentInfo Node consistent ids.
* @return {@code this} builder for use in a chained invocation.
*/
- public UnitStatusBuilder append(Version version, List<String>
consistentIds) {
- versionToConsistentIds.put(version, consistentIds);
+ public UnitStatusBuilder append(Version version, DeploymentInfo
deploymentInfo) {
+ versionToInfo.put(version, deploymentInfo);
return this;
}
@@ -152,7 +158,7 @@ public class UnitStatus {
* @return {@link UnitStatus} instance.
*/
public UnitStatus build() {
- return new UnitStatus(id, versionToConsistentIds);
+ return new UnitStatus(id, versionToInfo);
}
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
new file mode 100644
index 0000000000..6370a5a3f3
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.version.Version;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponse;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseImpl;
+import org.apache.ignite.internal.deployunit.message.StopDeployRequest;
+import org.apache.ignite.internal.deployunit.message.StopDeployRequestImpl;
+import org.apache.ignite.internal.deployunit.message.StopDeployResponseImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.network.ChannelType;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Messaging service for deploy actions.
+ */
+public class DeployMessagingService {
+ private static final IgniteLogger LOG =
Loggers.forClass(DeployMessagingService.class);
+
+ private static final ChannelType DEPLOYMENT_CHANNEL =
ChannelType.register((short) 1, "DeploymentUnits");
+
+ /**
+ * Cluster service.
+ */
+ private final ClusterService clusterService;
+
+ /**
+ * Cluster management group manager.
+ */
+ private final ClusterManagementGroupManager cmgManager;
+
+ /**
+ * File deployer service.
+ */
+ private final FileDeployerService deployerService;
+
+ /**
+ * Tracker of deploy actions.
+ */
+ private final DeployTracker tracker;
+
+ /**
+ * Constructor.
+ *
+ * @param clusterService Cluster service.
+ * @param cmgManager CMG manager.
+ * @param deployerService File deploying service.
+ * @param tracker Deploy action tracker.
+ */
+ public DeployMessagingService(
+ ClusterService clusterService,
+ ClusterManagementGroupManager cmgManager,
+ FileDeployerService deployerService,
+ DeployTracker tracker
+ ) {
+ this.clusterService = clusterService;
+ this.cmgManager = cmgManager;
+ this.deployerService = deployerService;
+ this.tracker = tracker;
+ }
+
+ /**
+ * Subscribe to all deployment messages.
+ */
+ public void subscribe() {
+
clusterService.messagingService().addMessageHandler(DeployUnitMessageTypes.class,
+ (message, senderConsistentId, correlationId) -> {
+ if (message instanceof DeployUnitRequest) {
+ processDeployRequest((DeployUnitRequest) message,
senderConsistentId, correlationId);
+ } else if (message instanceof UndeployUnitRequest) {
+ processUndeployRequest((UndeployUnitRequest) message,
senderConsistentId, correlationId);
+ } else if (message instanceof StopDeployRequest) {
+ processStopDeployRequest((StopDeployRequest) message,
senderConsistentId, correlationId);
+ }
+ });
+ }
+
+ /**
+ * Start deployment process to all nodes from CMG group.
+ *
+ * @param id Deployment unit identifier.
+ * @param version Deployment unit version.
+ * @param unitName Deployment unit file name.
+ * @param unitContent Deployment unit file content.
+ * @return Future with deployment result.
+ */
+ public CompletableFuture<List<String>> startDeployAsyncToCmg(String id,
Version version, String unitName, byte[] unitContent) {
+ DeployUnitRequest request = DeployUnitRequestImpl.builder()
+ .unitName(unitName)
+ .id(id)
+ .version(version.render())
+ .unitContent(unitContent)
+ .build();
+ return cmgManager.cmgNodes()
+ .thenCompose(nodes -> deploy(nodes, request));
+ }
+
+ private CompletableFuture<List<String>> deploy(Set<String> nodes,
DeployUnitRequest request) {
+ CompletableFuture<List<String>> resultFuture = new
CompletableFuture<>();
+ Map<String, Boolean> results = new ConcurrentHashMap<>();
+ CompletableFuture<Void> allDeployment = CompletableFuture.allOf(
+ nodes.stream().map(node ->
clusterService.topologyService().getByConsistentId(node))
+ .map(node -> requestDeploy(node, request)
+ .thenAccept(deployed ->
results.put(node.name(), deployed)))
+ .toArray(CompletableFuture[]::new));
+
+ allDeployment.thenAccept(v -> resultFuture.complete(
+ results.entrySet().stream()
+ .filter(Entry::getValue)
+ .map(Entry::getKey)
+ .collect(Collectors.toList()))
+ );
+
+ return resultFuture;
+ }
+
+ /**
+ * Stop all in-progress deployment processes for deployment unit with
provided id and version.
+ *
+ * @param id Deployment unit identifier.
+ * @param version Deployment unit version.
+ * @return Future with stop result.
+ */
+ public CompletableFuture<Void> stopInProgressDeploy(String id, Version
version) {
+ LOG.info("Stop in progress deploy for " + id + ":" + version);
+ return CompletableFuture.allOf(cmgManager.logicalTopology()
+ .thenApply(topology -> topology.nodes().stream().map(node ->
+ clusterService.messagingService()
+ .invoke(node,
+ DEPLOYMENT_CHANNEL,
+ StopDeployRequestImpl
+ .builder()
+ .id(id)
+
.version(version.render())
+ .build(),
+ Long.MAX_VALUE)
+ ).toArray(CompletableFuture[]::new)));
+ }
+
+ /**
+ * Start undeploy process from provided node with provided id and version.
+ *
+ * @param node Cluster node.
+ * @param id Deployment unit identifier.
+ * @param version Deployment unit version.
+ * @return Future with undeploy result.
+ */
+ public CompletableFuture<Void> undeploy(ClusterNode node, String id,
Version version) {
+ return clusterService.messagingService()
+ .invoke(node,
+ DEPLOYMENT_CHANNEL,
+ UndeployUnitRequestImpl.builder()
+ .id(id)
+ .version(version.render())
+ .build(),
+ Long.MAX_VALUE
+ ).thenAccept(message ->
+ LOG.info("Undeploy unit " + id + ":" + version + "
from node " + node + " finished"));
+ }
+
+ private CompletableFuture<Boolean> requestDeploy(ClusterNode clusterNode,
DeployUnitRequest request) {
+ return clusterService.messagingService()
+ .invoke(clusterNode, DEPLOYMENT_CHANNEL, request,
Long.MAX_VALUE)
+ .thenCompose(message -> {
+ boolean success = ((DeployUnitResponse) message).success();
+ if (!success) {
+ LOG.error("Failed to deploy unit " + request.id() +
":" + request.version()
+ + " to node " + clusterNode);
+ }
+ return CompletableFuture.completedFuture(success);
+ });
+ }
+
+ private void processStopDeployRequest(StopDeployRequest request, String
senderConsistentId, long correlationId) {
+ tracker.cancelIfDeploy(request.id(),
Version.parseVersion(request.version()));
+ clusterService.messagingService()
+ .respond(senderConsistentId,
StopDeployResponseImpl.builder().build(), correlationId);
+
+ }
+
+ private void processDeployRequest(DeployUnitRequest executeRequest, String
senderConsistentId, long correlationId) {
+ String id = executeRequest.id();
+ String version = executeRequest.version();
+ deployerService.deploy(id, version, executeRequest.unitName(),
executeRequest.unitContent())
+ .thenAccept(success ->
clusterService.messagingService().respond(
+ senderConsistentId,
+ DEPLOYMENT_CHANNEL,
+
DeployUnitResponseImpl.builder().success(success).build(),
+ correlationId)
+ );
+ }
+
+ private void processUndeployRequest(UndeployUnitRequest executeRequest,
String senderConsistentId, long correlationId) {
+ LOG.info("Start to undeploy " + executeRequest.id() + " with version "
+ executeRequest.version() + " from "
+ + clusterService.topologyService().localMember().name());
+ deployerService.undeploy(executeRequest.id(), executeRequest.version())
+ .thenRun(() -> clusterService.messagingService()
+ .respond(senderConsistentId,
UndeployUnitResponseImpl.builder().build(), correlationId));
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMetastoreService.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMetastoreService.java
new file mode 100644
index 0000000000..1222bb32a6
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMetastoreService.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.deployunit.key.UnitKey.allUnits;
+import static org.apache.ignite.internal.deployunit.key.UnitKey.key;
+import static org.apache.ignite.internal.deployunit.key.UnitKey.withId;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.function.Consumer;
+import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * Service for metastore access of deployment units.
+ */
+public class DeployMetastoreService {
+ private static final IgniteLogger LOG =
Loggers.forClass(DeployMetastoreService.class);
+
+ /**
+ * Meta storage.
+ */
+ private final MetaStorageManager metaStorage;
+
+ public DeployMetastoreService(MetaStorageManager metaStorage) {
+ this.metaStorage = metaStorage;
+ }
+
+ public CompletableFuture<Boolean> updateMeta(String id, Version version,
Consumer<UnitMeta> transformer) {
+ return updateMeta(id, version, false, transformer);
+ }
+
+ /**
+ * Update deployment unit meta.
+ *
+ * @param id Deployment unit identifier.
+ * @param version Deployment unit version.
+ * @param transformer Deployment unit meta transformer.
+ * @return Future with update result.
+ */
+ public CompletableFuture<Boolean> updateMeta(String id, Version version,
boolean force, Consumer<UnitMeta> transformer) {
+ LOG.info("Update meta for " + id + ":" + version);
+ ByteArray key = key(id, version);
+ return metaStorage.get(key)
+ .thenCompose(e -> {
+ LOG.info("Revision " + e.revision() + " for " + id + ":" +
version);
+ if (e.value() == null) {
+ return CompletableFuture.completedFuture(false);
+ }
+ UnitMeta prev = UnitMetaSerializer.deserialize(e.value());
+
+ transformer.accept(prev);
+
+ return metaStorage.invoke(
+ force ? exists(key) :
revision(key).le(e.revision()),
+ put(key, UnitMetaSerializer.serialize(prev)),
+ noop());
+ });
+ }
+
+ /**
+ * Put deployment unit meta if not exists.
+ *
+ * @param id Deployment unit identifier.
+ * @param version Deployment unit version.
+ * @param meta Deployment unit meta.
+ * @return Future with put result.
+ */
+ public CompletableFuture<Boolean> putIfNotExist(String id, Version
version, UnitMeta meta) {
+ ByteArray key = key(id, version);
+ Operation put = put(key, UnitMetaSerializer.serialize(meta));
+ return metaStorage.invoke(notExists(key), put, noop());
+ }
+
+ /**
+ * Remove deployment unit meta if exist.
+ *
+ * @param id Deployment unit identifier.
+ * @param version Deployment unit version.
+ */
+ public void removeIfExist(String id, Version version) {
+ ByteArray key = key(id, version);
+ metaStorage.invoke(exists(key), remove(key), noop());
+ }
+
+ /**
+ * Returns {@link Publisher} instance with all deployment unit metas.
+ *
+ * @return {@link Publisher} instance with all deployment unit metas.
+ */
+ public Publisher<Entry> getAll() {
+ return metaStorage.prefix(allUnits());
+ }
+
+ /**
+ * Returns {@link Publisher} instance with all deployment unit metas with
provided id.
+ *
+ * @param id Deployment unit identifier.
+ * @return {@link Publisher} instance with all deployment unit metas with
provided id.
+ */
+ public Publisher<Entry> getAllWithId(String id) {
+ return metaStorage.prefix(withId(id));
+ }
+
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
new file mode 100644
index 0000000000..a32a3e978d
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.deployunit.key.UnitKey.key;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * Deploy actions tracker.
+ */
+public class DeployTracker {
+ /**
+ * In flight futures tracker.
+ */
+ private final Map<ByteArray, InFlightFutures> inFlightFutures = new
ConcurrentHashMap<>();
+
+ /**
+ * Track deploy action.
+ *
+ * @param <T> Future result type.
+ * @param id Deployment unit identifier.
+ * @param version Deployment unit version.
+ * @param trackableAction Deploy action.
+ * @return {@param trackableAction}.
+ */
+ public <T> CompletableFuture<T> track(String id, Version version,
CompletableFuture<T> trackableAction) {
+ return inFlightFutures.computeIfAbsent(key(id, version), k -> new
InFlightFutures()).registerFuture(trackableAction);
+ }
+
+ /**
+ * Cancel deploy action for deployment unit with provided id and version
if exists.
+ *
+ * @param id Deployment unit identifier.
+ * @param version Deployment version identifier.
+ */
+ public void cancelIfDeploy(String id, Version version) {
+ InFlightFutures futureTracker = inFlightFutures.get(key(id, version));
+ if (futureTracker != null) {
+ futureTracker.cancelInFlightFutures();
+ }
+ }
+
+ /**
+ * Cancel all deploy actions.
+ */
+ public void cancelAll() {
+
inFlightFutures.values().forEach(InFlightFutures::cancelInFlightFutures);
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 855b902844..470110e673 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -17,26 +17,18 @@
package org.apache.ignite.internal.deployunit;
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.SYNC;
-import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
-import static org.apache.ignite.internal.deployunit.key.UnitKey.allUnits;
-import static org.apache.ignite.internal.deployunit.key.UnitKey.key;
-import static org.apache.ignite.internal.deployunit.key.UnitKey.withId;
-import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
-import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
-import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
-import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static java.util.concurrent.CompletableFuture.allOf;
+import static org.apache.ignite.deployment.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.deployment.DeploymentStatus.OBSOLETE;
+import static org.apache.ignite.deployment.DeploymentStatus.REMOVING;
import java.io.IOException;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.deployment.DeploymentStatus;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.deployment.IgniteDeployment;
import org.apache.ignite.deployment.UnitStatus;
@@ -47,32 +39,14 @@ import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitAlreadyExis
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
import org.apache.ignite.internal.deployunit.key.UnitMetaSerializer;
-import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
-import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
-import org.apache.ignite.internal.deployunit.message.DeployUnitRequestBuilder;
-import org.apache.ignite.internal.deployunit.message.DeployUnitRequestImpl;
-import org.apache.ignite.internal.deployunit.message.DeployUnitResponse;
-import org.apache.ignite.internal.deployunit.message.DeployUnitResponseBuilder;
-import org.apache.ignite.internal.deployunit.message.DeployUnitResponseImpl;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitResponse;
-import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
import org.apache.ignite.internal.deployunit.metastore.EntrySubscriber;
import org.apache.ignite.internal.deployunit.metastore.SortedListAccumulator;
import org.apache.ignite.internal.deployunit.metastore.UnitStatusAccumulator;
import org.apache.ignite.internal.deployunit.metastore.UnitsAccumulator;
-import org.apache.ignite.internal.future.InFlightFutures;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.dsl.Operation;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.ChannelType;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
/**
@@ -82,14 +56,10 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
private static final IgniteLogger LOG =
Loggers.forClass(DeploymentManagerImpl.class);
- private static final String TMP_SUFFIX = ".tmp";
-
- private static final ChannelType DEPLOYMENT_CHANNEL =
ChannelType.register((short) 1, "DeploymentUnits");
-
/**
- * Meta storage.
+ * Node working directory.
*/
- private final MetaStorageManager metaStorage;
+ private final Path workDir;
/**
* Deployment configuration.
@@ -101,20 +71,20 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
*/
private final ClusterManagementGroupManager cmgManager;
- /**
- * In flight futures tracker.
- */
- private final InFlightFutures inFlightFutures = new InFlightFutures();
/**
* Cluster service.
*/
private final ClusterService clusterService;
- /**
- * Folder for units.
- */
- private Path unitsFolder;
+
+ private final DeployMessagingService messaging;
+
+ private final FileDeployerService deployer;
+
+ private final DeployMetastoreService metastore;
+
+ private final DeployTracker tracker;
/**
* Constructor.
@@ -131,10 +101,13 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
DeploymentConfiguration configuration,
ClusterManagementGroupManager cmgManager) {
this.clusterService = clusterService;
- this.metaStorage = metaStorage;
this.configuration = configuration;
this.cmgManager = cmgManager;
- unitsFolder = workDir;
+ this.workDir = workDir;
+ this.tracker = new DeployTracker();
+ metastore = new DeployMetastoreService(metaStorage);
+ deployer = new FileDeployerService();
+ messaging = new DeployMessagingService(clusterService, cmgManager,
deployer, tracker);
}
@Override
@@ -143,67 +116,46 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
Objects.requireNonNull(version);
Objects.requireNonNull(deploymentUnit);
- ByteArray key = key(id, version.render());
- UnitMeta meta = new UnitMeta(id, version, deploymentUnit.name(),
Collections.emptyList());
-
- Operation put = put(key, UnitMetaSerializer.serialize(meta));
-
- DeployUnitRequestBuilder builder = DeployUnitRequestImpl.builder();
+ UnitMeta meta = new UnitMeta(id, version, deploymentUnit.name(),
DeploymentStatus.UPLOADING, Collections.emptyList());
+ byte[] unitContent;
try {
- builder.unitContent(deploymentUnit.content().readAllBytes());
+ unitContent = deploymentUnit.content().readAllBytes();
} catch (IOException e) {
LOG.error("Error to read deployment unit content", e);
return CompletableFuture.failedFuture(new
DeploymentUnitReadException(e));
}
- DeployUnitRequest request = builder
- .unitName(deploymentUnit.name())
- .id(id)
- .version(version.render())
- .build();
- return metaStorage.invoke(notExists(key), put, Operations.noop())
+ CompletableFuture<Boolean> result = metastore.putIfNotExist(id,
version, meta)
.thenCompose(success -> {
if (success) {
- return doDeploy(request);
+ return deployer.deploy(id, version.render(),
deploymentUnit.name(), unitContent);
}
LOG.error("Failed to deploy meta of unit " + id + ":" +
version);
return CompletableFuture.failedFuture(
new DeploymentUnitAlreadyExistsException(id,
"Unit " + id + ":" + version + " already
exists"));
})
+ .thenCompose(deployed -> {
+ if (deployed) {
+ return metastore.updateMeta(id, version,
+ meta1 ->
meta1.addConsistentId(clusterService.topologyService().localMember().name()));
+ }
+ return CompletableFuture.completedFuture(false);
+ })
.thenApply(completed -> {
if (completed) {
- startDeployAsyncToCmg(request);
+ messaging.startDeployAsyncToCmg(id, version,
deploymentUnit.name(), unitContent)
+ .thenAccept(ids -> metastore.updateMeta(id,
version, unitMeta -> {
+ for (String consistentId : ids) {
+ unitMeta.addConsistentId(consistentId);
+ }
+ unitMeta.updateStatus(DEPLOYED);
+ }));
}
return completed;
});
- }
-
- private void startDeployAsyncToCmg(DeployUnitRequest request) {
- cmgManager.cmgNodes()
- .thenAccept(nodes -> {
- for (String node : nodes) {
- ClusterNode clusterNode =
clusterService.topologyService().getByConsistentId(node);
- if (clusterNode != null) {
-
inFlightFutures.registerFuture(requestDeploy(clusterNode, request));
- }
- }
- });
- }
-
- private CompletableFuture<Boolean> requestDeploy(ClusterNode clusterNode,
DeployUnitRequest request) {
- return clusterService.messagingService()
- .invoke(clusterNode, request, Long.MAX_VALUE)
- .thenCompose(message -> {
- Throwable error = ((DeployUnitResponse) message).error();
- if (error != null) {
- LOG.error("Failed to deploy unit " + request.id() +
":" + request.version()
- + " to node " + clusterNode, error);
- return CompletableFuture.failedFuture(error);
- }
- return CompletableFuture.completedFuture(true);
- });
+ return tracker.track(id, version, result);
}
@Override
@@ -211,9 +163,15 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
checkId(id);
Objects.requireNonNull(version);
- ByteArray key = key(id, version.render());
-
- return metaStorage.invoke(exists(key), Operations.remove(key),
Operations.noop())
+ return messaging.stopInProgressDeploy(id, version)
+ .thenCompose(v -> metastore.updateMeta(id, version, true, meta
-> meta.updateStatus(OBSOLETE)))
+ .thenCompose(success -> {
+ if (success) {
+ //TODO: Check unit usages here. If unit used in
compute task we cannot just remove it.
+ return metastore.updateMeta(id, version, true, meta ->
meta.updateStatus(REMOVING));
+ }
+ return CompletableFuture.completedFuture(false);
+ })
.thenCompose(success -> {
if (success) {
return cmgManager.logicalTopology();
@@ -221,22 +179,10 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
return CompletableFuture.failedFuture(new
DeploymentUnitNotFoundException(
"Unit " + id + " with version " + version + "
doesn't exist"));
}).thenApply(logicalTopologySnapshot -> {
- for (ClusterNode node : logicalTopologySnapshot.nodes()) {
- clusterService.messagingService()
- .invoke(node, DEPLOYMENT_CHANNEL,
- UndeployUnitRequestImpl.builder()
- .id(id)
- .version(version.render())
- .build(),
- Long.MAX_VALUE)
- .thenAccept(message -> {
- Throwable error = ((UndeployUnitResponse)
message).error();
- if (error != null) {
- LOG.error("Failed to undeploy unit " +
id + ":" + version
- + " from node " + node, error);
- }
- });
- }
+ allOf(logicalTopologySnapshot.nodes().stream()
+ .map(node -> messaging.undeploy(node, id, version))
+ .toArray(CompletableFuture[]::new))
+ .thenAccept(unused -> metastore.removeIfExist(id,
version));
return null;
});
}
@@ -244,7 +190,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
@Override
public CompletableFuture<List<UnitStatus>> unitsAsync() {
CompletableFuture<List<UnitStatus>> result = new CompletableFuture<>();
- metaStorage.prefix(allUnits())
+ metastore.getAll()
.subscribe(new EntrySubscriber<>(result, new
UnitsAccumulator()));
return result;
}
@@ -253,7 +199,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
public CompletableFuture<List<Version>> versionsAsync(String id) {
checkId(id);
CompletableFuture<List<Version>> result = new CompletableFuture<>();
- metaStorage.prefix(withId(id))
+ metastore.getAllWithId(id)
.subscribe(
new EntrySubscriber<>(
result,
@@ -267,7 +213,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
public CompletableFuture<UnitStatus> statusAsync(String id) {
checkId(id);
CompletableFuture<UnitStatus> result = new CompletableFuture<>();
- metaStorage.prefix(withId(id))
+ metastore.getAllWithId(id)
.subscribe(new EntrySubscriber<>(result, new
UnitStatusAccumulator(id)));
return result;
}
@@ -277,7 +223,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
Objects.requireNonNull(consistentId);
CompletableFuture<List<UnitStatus>> result = new CompletableFuture<>();
- metaStorage.prefix(allUnits())
+ metastore.getAll()
.subscribe(
new EntrySubscriber<>(
result,
@@ -289,83 +235,13 @@ public class DeploymentManagerImpl implements
IgniteDeployment, IgniteComponent
@Override
public void start() {
- unitsFolder =
unitsFolder.resolve(configuration.deploymentLocation().value());
-
clusterService.messagingService().addMessageHandler(DeployUnitMessageTypes.class,
- (message, senderConsistentId, correlationId) -> {
- if (message instanceof DeployUnitRequest) {
- processDeployRequest((DeployUnitRequest) message,
senderConsistentId, correlationId);
- } else if (message instanceof UndeployUnitRequest) {
- processUndeployRequest((UndeployUnitRequest) message,
senderConsistentId, correlationId);
- }
- });
- }
-
- private void processDeployRequest(DeployUnitRequest executeRequest, String
senderConsistentId, long correlationId) {
- doDeploy(executeRequest).whenComplete((success, throwable) -> {
- DeployUnitResponseBuilder builder =
DeployUnitResponseImpl.builder();
- if (throwable != null) {
- builder.error(throwable);
- }
- clusterService.messagingService().respond(senderConsistentId,
DEPLOYMENT_CHANNEL,
- builder.build(), correlationId);
- });
- }
-
- private void processUndeployRequest(UndeployUnitRequest executeRequest,
String senderConsistentId, long correlationId) {
- try {
- Path unitPath = unitsFolder
- .resolve(executeRequest.id())
- .resolve(executeRequest.version());
-
- IgniteUtils.deleteIfExistsThrowable(unitPath);
- } catch (IOException e) {
- LOG.error("Failed to undeploy unit " + executeRequest.id() + ":" +
executeRequest.version(), e);
- clusterService.messagingService()
- .respond(senderConsistentId, DEPLOYMENT_CHANNEL,
UndeployUnitResponseImpl.builder().error(e).build(), correlationId);
- return;
- }
-
- clusterService.messagingService()
- .respond(senderConsistentId, DEPLOYMENT_CHANNEL,
UndeployUnitResponseImpl.builder().build(), correlationId);
- }
-
- private CompletableFuture<Boolean> doDeploy(DeployUnitRequest
executeRequest) {
- String id = executeRequest.id();
- String version = executeRequest.version();
- try {
- Path unitPath = unitsFolder
- .resolve(executeRequest.id())
- .resolve(executeRequest.version())
- .resolve(executeRequest.unitName());
-
- Path unitPathTmp = unitPath.resolveSibling(unitPath.getFileName()
+ TMP_SUFFIX);
-
- Files.createDirectories(unitPathTmp.getParent());
-
- Files.write(unitPathTmp, executeRequest.unitContent(), CREATE,
SYNC, TRUNCATE_EXISTING);
- Files.move(unitPathTmp, unitPath, ATOMIC_MOVE, REPLACE_EXISTING);
- } catch (IOException e) {
- LOG.error("Failed to deploy unit " + executeRequest.id() + ":" +
executeRequest.version(), e);
- return CompletableFuture.failedFuture(e);
- }
-
- ByteArray key = key(id, version);
- return metaStorage.get(key)
- .thenCompose(e -> {
- UnitMeta prev = UnitMetaSerializer.deserialize(e.value());
-
-
prev.addConsistentId(clusterService.topologyService().localMember().name());
-
- return metaStorage.invoke(
- revision(key).eq(e.revision()),
- put(key, UnitMetaSerializer.serialize(prev)),
- Operations.noop());
- });
+
deployer.initUnitsFolder(workDir.resolve(configuration.deploymentLocation().value()));
+ messaging.subscribe();
}
@Override
public void stop() throws Exception {
- inFlightFutures.cancelInFlightFutures();
+ tracker.cancelAll();
}
private static void checkId(String id) {
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
new file mode 100644
index 0000000000..73715e879b
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.SYNC;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Service for file deploying on local File System.
+ */
+public class FileDeployerService {
+ private static final IgniteLogger LOG =
Loggers.forClass(FileDeployerService.class);
+
+ private static final String TMP_SUFFIX = ".tmp";
+
+ private static final int DEPLOYMENT_EXECUTOR_SIZE = 4;
+
+ /**
+ * Folder for units.
+ */
+ private Path unitsFolder;
+
+
+ private final ExecutorService executor = Executors.newFixedThreadPool(
+ DEPLOYMENT_EXECUTOR_SIZE, new NamedThreadFactory("deployment",
LOG));
+
+ public void initUnitsFolder(Path unitsFolder) {
+ this.unitsFolder = unitsFolder;
+ }
+
+ /**
+ * Deploy provided unit on local fs.
+ *
+ * @param id Deploy unit identifier.
+ * @param version Deploy unit version.
+ * @param unitName Deploy unit file name.
+ * @param unitContent Deploy unit content.
+ * @return Future with deploy result.
+ */
+ public CompletableFuture<Boolean> deploy(String id, String version, String
unitName, byte[] unitContent) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ Path unitPath = unitsFolder
+ .resolve(id)
+ .resolve(version)
+ .resolve(unitName);
+
+ Path unitPathTmp =
unitPath.resolveSibling(unitPath.getFileName() + TMP_SUFFIX);
+
+ Files.createDirectories(unitPathTmp.getParent());
+
+ Files.write(unitPathTmp, unitContent, CREATE, SYNC,
TRUNCATE_EXISTING);
+ Files.move(unitPathTmp, unitPath, ATOMIC_MOVE,
REPLACE_EXISTING);
+ return true;
+ } catch (IOException e) {
+ LOG.error("Failed to deploy unit " + id + ":" + version, e);
+ return false;
+ }
+ }, executor);
+ }
+
+ /**
+ * Undeploy unit with provided identifier and version.
+ *
+ * @param id Deployment unit identifier.
+ * @param version Deployment unit version.
+ * @return Future with undeploy result
+ */
+ public CompletableFuture<Boolean> undeploy(String id, String version) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ Path unitPath = unitsFolder
+ .resolve(id)
+ .resolve(version);
+
+ IgniteUtils.deleteIfExistsThrowable(unitPath);
+ return true;
+ } catch (IOException e) {
+ LOG.error("Failed to undeploy unit " + id + ":" + version, e);
+ return false;
+ }
+ }, executor);
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
index e8b119e593..29e5f0f8df 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.deployunit;
import java.util.ArrayList;
import java.util.List;
+import org.apache.ignite.deployment.DeploymentStatus;
import org.apache.ignite.deployment.version.Version;
/**
@@ -40,6 +41,11 @@ public class UnitMeta {
*/
private final String name;
+ /**
+ * Deployment status.
+ */
+ private DeploymentStatus status;
+
/**
* Consistent ids of nodes with.
*/
@@ -53,10 +59,11 @@ public class UnitMeta {
* @param name Unit name.
* @param consistentIdLocation Consistent ids of nodes where unit deployed.
*/
- public UnitMeta(String id, Version version, String name, List<String>
consistentIdLocation) {
+ public UnitMeta(String id, Version version, String name, DeploymentStatus
status, List<String> consistentIdLocation) {
this.id = id;
this.version = version;
this.name = name;
+ this.status = status;
this.consistentIdLocation.addAll(consistentIdLocation);
}
@@ -105,6 +112,14 @@ public class UnitMeta {
consistentIdLocation.add(id);
}
+ public DeploymentStatus status() {
+ return status;
+ }
+
+ public void updateStatus(DeploymentStatus status) {
+ this.status = status;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -125,6 +140,9 @@ public class UnitMeta {
if (name != null ? !name.equals(meta.name) : meta.name != null) {
return false;
}
+ if (status != meta.status) {
+ return false;
+ }
return consistentIdLocation.equals(meta.consistentIdLocation);
}
@@ -133,7 +151,19 @@ public class UnitMeta {
int result = id != null ? id.hashCode() : 0;
result = 31 * result + (version != null ? version.hashCode() : 0);
result = 31 * result + (name != null ? name.hashCode() : 0);
+ result = 31 * result + (status != null ? status.hashCode() : 0);
result = 31 * result + consistentIdLocation.hashCode();
return result;
}
+
+ @Override
+ public String toString() {
+ return "UnitMeta{"
+ + "id='" + id + '\''
+ + ", version=" + version
+ + ", name='" + name + '\''
+ + ", status=" + status
+ + ", consistentIdLocation=" + String.join(", ",
consistentIdLocation)
+ + '}';
+ }
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
index fe61f27d17..e79a5dbfc6 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitKey.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.deployunit.key;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Base64.Encoder;
+import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.lang.ByteArray;
/**
@@ -61,14 +62,14 @@ public final class UnitKey {
* @param version Required unit version.
* @return Key in {@link ByteArray} format.
*/
- public static ByteArray key(String id, String version) {
+ public static ByteArray key(String id, Version version) {
StringBuilder sb = new StringBuilder(UNITS_PREFIX);
Encoder encoder = Base64.getEncoder();
if (id != null) {
sb.append(encoder.encodeToString(id.getBytes(StandardCharsets.UTF_8)));
if (version != null) {
sb.append(":");
-
sb.append(encoder.encodeToString(version.getBytes(StandardCharsets.UTF_8)));
+
sb.append(encoder.encodeToString(version.render().getBytes(StandardCharsets.UTF_8)));
}
}
return new ByteArray(sb.toString());
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
index 18f7e9b2f4..624bdd17ab 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/key/UnitMetaSerializer.java
@@ -24,6 +24,7 @@ import java.util.Base64;
import java.util.Base64.Decoder;
import java.util.Base64.Encoder;
import java.util.List;
+import org.apache.ignite.deployment.DeploymentStatus;
import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.deployunit.UnitMeta;
@@ -52,6 +53,7 @@ public final class UnitMetaSerializer {
appendWithEncoding(sb, meta.id());
appendWithEncoding(sb, meta.version().render());
appendWithEncoding(sb, meta.name());
+ appendWithEncoding(sb, meta.status().name());
for (String id : meta.consistentIdLocation()) {
appendWithEncoding(sb, id);
@@ -81,11 +83,13 @@ public final class UnitMetaSerializer {
String version = new String(decoder.decode(split[1]), UTF_8);
String unitName = new String(decoder.decode(split[2]), UTF_8);
+ DeploymentStatus status = DeploymentStatus.valueOf(new
String(decoder.decode(split[3]), UTF_8));
+
List<String> ids = new ArrayList<>();
- for (int i = 3; i < split.length; i++) {
+ for (int i = 4; i < split.length; i++) {
ids.add(new String(decoder.decode(split[i]), UTF_8));
}
- return new UnitMeta(id, Version.parseVersion(version), unitName, ids);
+ return new UnitMeta(id, Version.parseVersion(version), unitName,
status, ids);
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
index 54337f7fd5..19d5b8f5ce 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitMessageTypes.java
@@ -44,4 +44,8 @@ public class DeployUnitMessageTypes {
* Message type for {@link UndeployUnitResponse}.
*/
public static final short UNDEPLOY_UNIT_RESPONSE = 3;
+
+ public static final short STOP_DEPLOY_REQUEST = 4;
+
+ public static final short STOP_DEPLOY_RESPONSE = 5;
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
index ee298c8a1d..084874304b 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.deployunit.message;
import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -27,10 +26,9 @@ import org.apache.ignite.network.annotations.Transferable;
@Transferable(DeployUnitMessageTypes.DEPLOY_UNIT_RESPONSE)
public interface DeployUnitResponse extends NetworkMessage {
/**
- * Returns error which happens on deploy process.
+ * Shows success or not deploy process.
*
- * @return error which happens on deploy process.
+ * @return success or not deploy process.
*/
- @Marshallable
- Throwable error();
+ boolean success();
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployRequest.java
similarity index 72%
copy from
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
copy to
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployRequest.java
index ee298c8a1d..06bcf74f0e 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployRequest.java
@@ -18,19 +18,24 @@
package org.apache.ignite.internal.deployunit.message;
import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
- * Deploy unit response.
+ * Stop deploy request.
*/
-@Transferable(DeployUnitMessageTypes.DEPLOY_UNIT_RESPONSE)
-public interface DeployUnitResponse extends NetworkMessage {
+@Transferable(DeployUnitMessageTypes.STOP_DEPLOY_REQUEST)
+public interface StopDeployRequest extends NetworkMessage {
/**
- * Returns error which happens on deploy process.
+ * Returns deployment unit identifier.
*
- * @return error which happens on deploy process.
+ * @return deployment unit identifier.
*/
- @Marshallable
- Throwable error();
+ String id();
+
+ /**
+ * Returns deployment unit version.
+ *
+ * @return deployment unit version.
+ */
+ String version();
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployResponse.java
similarity index 72%
copy from
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
copy to
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployResponse.java
index ee298c8a1d..84a96a251a 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/DeployUnitResponse.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/StopDeployResponse.java
@@ -17,20 +17,14 @@
package org.apache.ignite.internal.deployunit.message;
+import static
org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes.STOP_DEPLOY_RESPONSE;
+
import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
- * Deploy unit response.
+ * Stop deploy request.
*/
-@Transferable(DeployUnitMessageTypes.DEPLOY_UNIT_RESPONSE)
-public interface DeployUnitResponse extends NetworkMessage {
- /**
- * Returns error which happens on deploy process.
- *
- * @return error which happens on deploy process.
- */
- @Marshallable
- Throwable error();
+@Transferable(STOP_DEPLOY_RESPONSE)
+public interface StopDeployResponse extends NetworkMessage {
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
index e430724ac4..3eccbb243f 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/message/UndeployUnitResponse.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.deployunit.message;
import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -26,11 +25,4 @@ import org.apache.ignite.network.annotations.Transferable;
*/
@Transferable(DeployUnitMessageTypes.UNDEPLOY_UNIT_RESPONSE)
public interface UndeployUnitResponse extends NetworkMessage {
- /**
- * Returns error which happens on undeploy process.
- *
- * @return error which happens on undeploy process.
- */
- @Marshallable
- Throwable error();
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ListAccumulator.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ListAccumulator.java
new file mode 100644
index 0000000000..225e0b5d71
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ListAccumulator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit.metastore;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.ignite.internal.metastorage.Entry;
+
+/**
+ * Plain list accumulator.
+ *
+ * @param <T> Result value type.
+ */
+public class ListAccumulator<T extends Comparable<T>> implements
Accumulator<List<T>> {
+ private final Function<Entry, T> mapper;
+
+ private final List<T> result = new ArrayList<>();
+
+ /**
+ * Constructor.
+ *
+ * @param mapper Value mapper.
+ */
+ public ListAccumulator(Function<Entry, T> mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void accumulate(Entry value) {
+ T apply = mapper.apply(value);
+ if (apply != null) {
+ result.add(apply);
+ }
+ }
+
+ @Override
+ public List<T> get() throws AccumulateException {
+ Collections.sort(result);
+ return result;
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
index db4b2d4686..1a96868589 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitStatusAccumulator.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.deployunit.metastore;
+import org.apache.ignite.deployment.DeploymentInfo;
import org.apache.ignite.deployment.UnitStatus;
import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
import org.apache.ignite.internal.deployunit.UnitMeta;
@@ -46,8 +47,12 @@ public class UnitStatusAccumulator implements
Accumulator<UnitStatus> {
if (builder == null) {
builder = UnitStatus.builder(id);
}
- UnitMeta deserialize = UnitMetaSerializer.deserialize(item.value());
- builder.append(deserialize.version(),
deserialize.consistentIdLocation());
+ UnitMeta meta = UnitMetaSerializer.deserialize(item.value());
+ builder.append(meta.version(),
+ DeploymentInfo.builder()
+ .status(meta.status())
+ .addConsistentIds(meta.consistentIdLocation()).build()
+ );
}
@Override
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
index e911c88981..6d1e91c555 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/UnitsAccumulator.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.apache.ignite.deployment.DeploymentInfo;
import org.apache.ignite.deployment.UnitStatus;
import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
import org.apache.ignite.internal.deployunit.UnitMeta;
@@ -49,7 +50,11 @@ public class UnitsAccumulator implements
Accumulator<List<UnitStatus>> {
UnitMeta meta = UnitMetaSerializer.deserialize(item.value());
if (filter.test(meta)) {
map.computeIfAbsent(meta.id(), UnitStatus::builder)
- .append(meta.version(), meta.consistentIdLocation());
+ .append(meta.version(),
+ DeploymentInfo.builder()
+ .status(meta.status())
+
.addConsistentIds(meta.consistentIdLocation()).build()
+ );
}
}
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
index 695b187cdb..09501557a4 100644
---
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitMetaSerializerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.deployment;
+import static org.apache.ignite.deployment.DeploymentStatus.UPLOADING;
import static
org.apache.ignite.internal.deployunit.key.UnitMetaSerializer.deserialize;
import static
org.apache.ignite.internal.deployunit.key.UnitMetaSerializer.serialize;
import static org.hamcrest.Matchers.is;
@@ -35,7 +36,7 @@ import org.junit.jupiter.api.Test;
public class UnitMetaSerializerTest {
@Test
public void testSerializeDeserializeLatest() {
- UnitMeta meta = new UnitMeta("id", Version.LATEST, "unitName",
Arrays.asList("id1", "id2"));
+ UnitMeta meta = new UnitMeta("id", Version.LATEST, "unitName",
UPLOADING, Arrays.asList("id1", "id2"));
byte[] serialize = serialize(meta);
@@ -44,7 +45,7 @@ public class UnitMetaSerializerTest {
@Test
public void testSerializeDeserializeUnit() {
- UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0.0"),
"unitName", Arrays.asList("id1", "id2"));
+ UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0.0"),
"unitName", UPLOADING, Arrays.asList("id1", "id2"));
byte[] serialize = serialize(meta);
@@ -53,7 +54,7 @@ public class UnitMetaSerializerTest {
@Test
public void testSerializeDeserializeUnitIncompleteVersion() {
- UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0"),
"unitName", Arrays.asList("id1", "id2"));
+ UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0"),
"unitName", UPLOADING, Arrays.asList("id1", "id2"));
byte[] serialize = serialize(meta);
@@ -62,7 +63,7 @@ public class UnitMetaSerializerTest {
@Test
public void testSerializeDeserializeUnitEmptyConsistentId() {
- UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0.0"),
"unitName", Collections.emptyList());
+ UnitMeta meta = new UnitMeta("id", Version.parseVersion("3.0.0"),
"unitName", UPLOADING, Collections.emptyList());
byte[] serialize = serialize(meta);
@@ -72,7 +73,7 @@ public class UnitMetaSerializerTest {
@Test
public void testSerializeDeserializeWithSeparatorCharInIdName() {
- UnitMeta meta = new UnitMeta("id;", Version.parseVersion("3.0.0"),
"unitName;", Collections.emptyList());
+ UnitMeta meta = new UnitMeta("id;", Version.parseVersion("3.0.0"),
"unitName;", UPLOADING, Collections.emptyList());
byte[] serialize = serialize(meta);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
b/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
index 70174fa308..7a7bd215f0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
@@ -36,10 +36,11 @@ public class InFlightFutures implements
Iterable<CompletableFuture<?>> {
*
* @param future the future to register
*/
- public void registerFuture(CompletableFuture<?> future) {
+ public <T> CompletableFuture<T> registerFuture(CompletableFuture<T>
future) {
inFlightFutures.add(future);
future.whenComplete((result, ex) -> inFlightFutures.remove(future));
+ return future;
}
/**
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
index 19c382dc74..0178afef8a 100644
---
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/deployment/DeploymentManagementController.java
@@ -46,7 +46,7 @@ public class DeploymentManagementController implements
DeploymentCodeApi {
try {
DeploymentUnit deploymentUnit = toDeploymentUnit(unitContent);
if (unitVersion == null || unitVersion.isBlank()) {
- return deployment.deployAsync(unitId, deploymentUnit);
+ return deployment.deployAsync(unitId, Version.LATEST,
deploymentUnit);
}
return deployment.deployAsync(unitId,
Version.parseVersion(unitVersion), deploymentUnit);
} catch (IOException e) {
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index dc58e3b731..26ea20da90 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -133,6 +133,7 @@ dependencies {
integrationTestImplementation(testFixtures(project(':ignite-storage-api')))
integrationTestImplementation(testFixtures(project(':ignite-transactions')))
integrationTestImplementation libs.jetbrains.annotations
+ integrationTestImplementation libs.awaitility
integrationTestImplementation libs.rocksdb.jni
integrationTestImplementation libs.disruptor
integrationTestImplementation libs.jackson.databind
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
index 8aaf283ac6..a3fb8e1db9 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java
@@ -19,7 +19,15 @@ package org.apache.ignite.internal.deployment;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.deployment.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.deployment.DeploymentStatus.REMOVING;
+import static org.apache.ignite.deployment.DeploymentStatus.UPLOADING;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -34,7 +42,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import org.apache.ignite.deployment.DeploymentInfo;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.deployment.UnitStatus;
import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
@@ -42,7 +50,7 @@ import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -50,81 +58,95 @@ import org.junit.jupiter.api.Test;
* Integration tests for {@link org.apache.ignite.deployment.IgniteDeployment}.
*/
public class ItDeploymentUnitTest extends ClusterPerTestIntegrationTest {
- private static final long REPLICA_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
- private static final long SIZE_IN_BYTES = 1024L;
+ private static final int BASE_REPLICA_TIMEOUT = 30;
- private Path dummyFile;
+ private static final long SMALL_IN_BYTES = 1024L;
+
+ private static final long MEDIUM_IN_BYTES = 1024L * 1024L;
+
+ private static final long BIG_IN_BYTES = 1024L * 1024L * 1024L;
+
+ private DeployFile smallFile;
+
+ private DeployFile mediumFile;
+
+ private DeployFile bigFile;
@BeforeEach
public void generateDummy() throws IOException {
- dummyFile = workDir.resolve("dummy.txt");
-
- if (!Files.exists(dummyFile)) {
- try (SeekableByteChannel channel = Files.newByteChannel(dummyFile,
WRITE, CREATE)) {
- channel.position(SIZE_IN_BYTES - 4);
+ smallFile = create("small.txt", SMALL_IN_BYTES, BASE_REPLICA_TIMEOUT);
+ mediumFile = create("medium.txt", MEDIUM_IN_BYTES,
BASE_REPLICA_TIMEOUT * 2);
+ bigFile = create("big.txt", BIG_IN_BYTES, BASE_REPLICA_TIMEOUT * 3);
+ }
- ByteBuffer buf = ByteBuffer.allocate(4).putInt(2);
- buf.rewind();
- channel.write(buf);
- }
- }
+ private DeployFile create(String name, long size, int replicaTimeout)
throws IOException {
+ DeployFile deployFile = new DeployFile(workDir.resolve(name), size,
replicaTimeout);
+ deployFile.ensureExist();
+ return deployFile;
}
@Test
- public void testDeploy() throws Exception {
- Unit unit = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
+ public void testDeploy() {
+ String id = "test";
+ Unit unit = deployAndVerifySmall(id, Version.parseVersion("1.1.0"), 1);
IgniteImpl cmg = cluster.node(0);
waitUnitReplica(cmg, unit);
- CompletableFuture<List<UnitStatus>> list =
node(2).deployment().unitsAsync();
- UnitStatusBuilder builder =
UnitStatus.builder(unit.id).append(unit.version,
List.of(unit.deployedNode.name(), cmg.name()));
- assertThat(list, willBe(List.of(builder.build())));
+ UnitStatus status = buildStatus(id, unit);
+
+ await().timeout(2, SECONDS)
+ .pollDelay(500, MILLISECONDS)
+ .until(() -> node(2).deployment().unitsAsync(),
willBe(Collections.singletonList(status)));
}
@Test
- public void testDeployUndeploy() throws Exception {
- Unit unit = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
+ public void testDeployUndeploy() {
+ Unit unit = deployAndVerifySmall("test",
Version.parseVersion("1.1.0"), 1);
unit.undeploy();
+ IgniteImpl cmg = cluster.node(0);
+ waitUnitClean(cmg, unit);
+
CompletableFuture<List<UnitStatus>> list =
node(2).deployment().unitsAsync();
assertThat(list, willBe(Collections.emptyList()));
}
@Test
- public void testDeployTwoUnits() throws Exception {
+ public void testDeployTwoUnits() {
String id = "test";
- Unit unit1 = deployAndVerify(id, Version.parseVersion("1.1.0"), 1);
- Unit unit2 = deployAndVerify(id, Version.parseVersion("1.1.1"), 2);
+ Unit unit1 = deployAndVerifySmall(id, Version.parseVersion("1.1.0"),
1);
+ Unit unit2 = deployAndVerifySmall(id, Version.parseVersion("1.1.1"),
2);
IgniteImpl cmg = cluster.node(0);
waitUnitReplica(cmg, unit1);
waitUnitReplica(cmg, unit2);
- CompletableFuture<UnitStatus> list =
node(2).deployment().statusAsync(id);
- UnitStatusBuilder status = UnitStatus.builder(id)
- .append(unit1.version, List.of(unit1.deployedNode.name(),
cmg.name()))
- .append(unit2.version, List.of(unit2.deployedNode.name(),
cmg.name()));
- assertThat(list, willBe(status.build()));
+ UnitStatus status = buildStatus(id, unit1, unit2);
+
+ await().timeout(2, SECONDS)
+ .pollDelay(100, MILLISECONDS)
+ .until(() -> node(2).deployment().statusAsync(id),
willBe(status));
CompletableFuture<List<Version>> versions =
node(2).deployment().versionsAsync(unit1.id);
assertThat(versions, willBe(List.of(unit1.version, unit2.version)));
}
@Test
- public void testDeployTwoUnitsAndUndeployOne() throws Exception {
- Unit unit1 = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
- Unit unit2 = deployAndVerify("test", Version.parseVersion("1.1.1"), 2);
+ public void testDeployTwoUnitsAndUndeployOne() {
+ String id = "test";
+ Unit unit1 = deployAndVerifySmall(id, Version.parseVersion("1.1.0"),
1);
+ Unit unit2 = deployAndVerifySmall(id, Version.parseVersion("1.1.1"),
2);
IgniteImpl cmg = cluster.node(0);
waitUnitReplica(cmg, unit1);
waitUnitReplica(cmg, unit2);
- CompletableFuture<UnitStatus> list =
node(2).deployment().statusAsync(unit2.id);
- UnitStatusBuilder builder = UnitStatus.builder(unit1.id)
- .append(unit1.version, List.of(unit1.deployedNode.name(),
cmg.name()))
- .append(unit2.version, List.of(unit2.deployedNode.name(),
cmg.name()));
- assertThat(list, willBe(builder.build()));
+ UnitStatus status = buildStatus(id, unit1, unit2);
+
+ await().timeout(2, SECONDS)
+ .pollDelay(500, MILLISECONDS)
+ .until(() -> node(2).deployment().statusAsync(id),
willBe(status));
unit2.undeploy();
CompletableFuture<List<Version>> newVersions =
node(2).deployment().versionsAsync(unit1.id);
@@ -132,77 +154,149 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
}
@Test
- public void testFindByConsistentId() throws InterruptedException {
+ public void testDeploymentStatus() {
+ String id = "test";
+ Version version = Version.parseVersion("1.1.0");
+ Unit unit = deployAndVerifyMedium(id, version, 1);
+
+ CompletableFuture<UnitStatus> status =
node(2).deployment().statusAsync(id);
+ assertThat(status.thenApply(status1 -> status1.status(version)),
willBe(UPLOADING));
+
+ IgniteImpl cmg = cluster.node(0);
+ waitUnitReplica(cmg, unit);
+
+ await().timeout(2, SECONDS)
+ .pollDelay(300, MILLISECONDS)
+ .until(() -> node(2).deployment().statusAsync(id)
+ .thenApply(status1 -> status1.status(version)),
willBe(DEPLOYED));
+
+ assertThat(unit.undeployAsync(), willSucceedFast());
+
+ assertThat(node(1).deployment().statusAsync(id)
+ .thenApply(status1 -> status1.status(version)),
willBe(REMOVING));
+
+ waitUnitClean(unit.deployedNode, unit);
+ waitUnitClean(cmg, unit);
+
+ assertThat(node(2).deployment().statusAsync(id)
+ .thenApply(status1 -> status1.status(version)),
willThrowFast(DeploymentUnitNotFoundException.class));
+ }
+
+ @Test
+ public void testFindByConsistentId() {
String id = "test";
String version = "1.1.0";
- Unit unit = deployAndVerify(id, Version.parseVersion(version), 1);
+ Unit unit = deployAndVerify(id, Version.parseVersion(version),
mediumFile, 1);
IgniteImpl cmg = cluster.node(0);
waitUnitReplica(cmg, unit);
- IgniteImpl node = node(1);
+ IgniteImpl node = unit.deployedNode;
+
CompletableFuture<List<UnitStatus>> nodes =
node.deployment().findUnitByConsistentIdAsync(node.name());
assertThat(nodes, willBe(Collections.singletonList(
UnitStatus.builder(id)
- .append(unit.version, List.of(node.name(),
cmg.name()))
- .build())
+ .append(unit.version,
+ DeploymentInfo.builder()
+ .status(DEPLOYED)
+
.addConsistentId(unit.deployedNode.name())
+ .addConsistentId(cmg.name())
+ .build()
+ ).build()
+ )
)
);
- nodes = node.deployment().findUnitByConsistentIdAsync(node.name());
+ nodes = node.deployment().findUnitByConsistentIdAsync(cmg.name());
assertThat(nodes, willBe(Collections.singletonList(
UnitStatus.builder(id)
- .append(unit.version, List.of(node.name(),
cmg.name()))
- .build())
+ .append(unit.version,
+ DeploymentInfo.builder()
+ .status(DEPLOYED)
+
.addConsistentId(unit.deployedNode.name())
+ .addConsistentId(cmg.name())
+ .build()
+ ).build()
)
- );
+ ));
nodes =
node.deployment().findUnitByConsistentIdAsync("not-existed-node");
assertThat(nodes, willBe(Collections.emptyList()));
}
- private Unit deployAndVerify(String id, Version version, int nodeIndex) {
+ private UnitStatus buildStatus(String id, Unit... units) {
+ IgniteImpl cmg = cluster.node(0);
+
+ UnitStatusBuilder builder = UnitStatus.builder(id);
+ for (Unit unit : units) {
+ builder.append(unit.version,
+ DeploymentInfo.builder()
+ .status(DEPLOYED)
+ .addConsistentId(unit.deployedNode.name())
+ .addConsistentId(cmg.name())
+ .build()
+ );
+ }
+
+ return builder.build();
+ }
+
+ private Unit deployAndVerify(String id, Version version, DeployFile file,
int nodeIndex) {
IgniteImpl entryNode = node(nodeIndex);
CompletableFuture<Boolean> deploy = entryNode.deployment()
- .deployAsync(id, version, fromPath(dummyFile));
+ .deployAsync(id, version, fromPath(file.file));
assertThat(deploy, willBe(true));
- Unit unit = new Unit(entryNode, id, version);
- assertTrue(Files.exists(getNodeUnitFile(unit)));
+ Unit unit = new Unit(entryNode, id, version, file);
+ Path nodeUnitFile = getNodeUnitFile(unit);
+ assertTrue(Files.exists(nodeUnitFile));
return unit;
}
+ private Unit deployAndVerifySmall(String id, Version version, int
nodeIndex) {
+ return deployAndVerify(id, version, smallFile, nodeIndex);
+ }
+
+ private Unit deployAndVerifyMedium(String id, Version version, int
nodeIndex) {
+ return deployAndVerify(id, version, mediumFile, nodeIndex);
+ }
+
+ private Unit deployAndVerifyBig(String id, Version version, int nodeIndex)
{
+ return deployAndVerify(id, version, bigFile, nodeIndex);
+ }
+
private Path getNodeUnitFile(Unit unit) {
- return getNodeUnitFile(unit.deployedNode, unit.id, unit.version);
+ return getNodeUnitFile(unit.deployedNode, unit.id, unit.version,
unit.file);
}
- private Path getNodeUnitFile(IgniteImpl node, String unitId, Version
unitVersion) {
+ private Path getNodeUnitFile(IgniteImpl node, String unitId, Version
unitVersion, DeployFile file) {
String deploymentFolder = node.nodeConfiguration()
.getConfiguration(DeploymentConfiguration.KEY)
.deploymentLocation().value();
Path resolve = workDir.resolve(node.name()).resolve(deploymentFolder);
return resolve.resolve(unitId)
.resolve(unitVersion.render())
- .resolve(dummyFile.getFileName());
+ .resolve(file.file.getFileName());
}
- private void waitUnitReplica(IgniteImpl ignite, Unit unit) throws
InterruptedException {
- Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version);
- assertTrue(IgniteTestUtils.waitForCondition(() -> {
- try {
- return Files.exists(unitPath) && Files.size(unitPath) ==
SIZE_IN_BYTES;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }, REPLICA_TIMEOUT));
+ private void waitUnitReplica(IgniteImpl ignite, Unit unit) {
+ Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version,
unit.file);
+
+ await().timeout(unit.file.replicaTimeout, SECONDS)
+ .pollDelay(1, SECONDS)
+ .ignoreException(IOException.class)
+ .until(() -> Files.exists(unitPath) && Files.size(unitPath) ==
unit.file.expectedSize);
}
- private void waitUnitClean(IgniteImpl ignite, Unit unit) throws
InterruptedException {
- Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version);
- assertTrue(IgniteTestUtils.waitForCondition(() ->
!Files.exists(unitPath), REPLICA_TIMEOUT));
+ private void waitUnitClean(IgniteImpl ignite, Unit unit) {
+ Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version,
unit.file);
+
+ await().timeout(unit.file.replicaTimeout, SECONDS)
+ .pollDelay(2, SECONDS)
+ .until(() -> !Files.exists(unitPath));
}
class Unit {
@@ -212,19 +306,56 @@ public class ItDeploymentUnitTest extends
ClusterPerTestIntegrationTest {
private final Version version;
- Unit(IgniteImpl deployedNode, String id, Version version) {
+ private final DeployFile file;
+
+ Unit(IgniteImpl deployedNode, String id, Version version, DeployFile
file) {
this.deployedNode = deployedNode;
this.id = id;
this.version = version;
+ this.file = file;
}
- void undeploy() throws InterruptedException {
+ CompletableFuture<Void> undeployAsync() {
+ return deployedNode.deployment().undeployAsync(id, version);
+ }
+
+ void undeploy() {
deployedNode.deployment().undeployAsync(id, version);
waitUnitClean(deployedNode, this);
}
}
- static DeploymentUnit fromPath(Path path) {
+ private static class DeployFile {
+ private final Path file;
+
+ private final long expectedSize;
+
+ private final int replicaTimeout;
+
+ private DeployFile(Path file, long expectedSize, int replicaTimeout) {
+ this.file = file;
+ this.expectedSize = expectedSize;
+ this.replicaTimeout = replicaTimeout;
+ }
+
+ public void ensureExist() throws IOException {
+ ensureFile(file, expectedSize);
+ }
+
+ private static void ensureFile(Path path, long size) throws
IOException {
+ if (!Files.exists(path)) {
+ try (SeekableByteChannel channel = Files.newByteChannel(path,
WRITE, CREATE)) {
+ channel.position(size - 4);
+
+ ByteBuffer buf = ByteBuffer.allocate(4).putInt(2);
+ buf.rewind();
+ channel.write(buf);
+ }
+ }
+ }
+ }
+
+ private static DeploymentUnit fromPath(Path path) {
Objects.requireNonNull(path);
return new DeploymentUnit() {