This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1539873b49 IGNITE-21757 Fix redeploy (#3528)
1539873b49 is described below
commit 1539873b49f041b4f39844d512012685d09d33ea
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Fri Apr 5 10:14:19 2024 +0300
IGNITE-21757 Fix redeploy (#3528)
Now the UUID is used as an operation id that fixes ABA problem.
---
.../ignite/internal/deployunit/UnitStatus.java | 26 +++-----
.../metastore/ClusterEventCallbackImpl.java | 19 +++---
.../metastore/DeploymentUnitFailover.java | 14 +++--
.../deployunit/metastore/DeploymentUnitStore.java | 9 ++-
.../metastore/DeploymentUnitStoreImpl.java | 33 +++++++---
.../metastore/status/SerializeUtils.java | 34 +++++++++-
.../metastore/status/UnitClusterStatus.java | 24 +++++---
.../metastore/status/UnitNodeStatus.java | 35 +++++------
.../deployment/UnitStatusesSerializerTest.java | 21 ++++---
.../metastore/DeploymentUnitStoreImplTest.java | 72 ++++++++++++++++++----
.../deployunit/DeploymentManagerImplTest.java | 15 +++--
11 files changed, 202 insertions(+), 100 deletions(-)
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
index 169fb174bb..0a85c60e7b 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.deployunit;
+import java.util.Objects;
+import java.util.UUID;
import org.apache.ignite.compute.version.Version;
/**
@@ -38,7 +40,7 @@ public abstract class UnitStatus {
*/
private DeploymentStatus status;
- private final long opId;
+ private final UUID opId;
/**
* Constructor.
@@ -48,7 +50,7 @@ public abstract class UnitStatus {
* @param status Unit status.
* @param opId Deployment unit operation identifier.
*/
- public UnitStatus(String id, Version version, DeploymentStatus status,
long opId) {
+ public UnitStatus(String id, Version version, DeploymentStatus status,
UUID opId) {
this.id = id;
this.version = version;
this.status = status;
@@ -96,7 +98,7 @@ public abstract class UnitStatus {
*
* @return Operation identifier of deployment unit creation.
*/
- public long opId() {
+ public UUID opId() {
return opId;
}
@@ -108,24 +110,14 @@ public abstract class UnitStatus {
if (o == null || getClass() != o.getClass()) {
return false;
}
-
- UnitStatus meta = (UnitStatus) o;
-
- if (id != null ? !id.equals(meta.id) : meta.id != null) {
- return false;
- }
- if (version != null ? !version.equals(meta.version) : meta.version !=
null) {
- return false;
- }
- return status == meta.status;
+ UnitStatus that = (UnitStatus) o;
+ return Objects.equals(id, that.id) && Objects.equals(version,
that.version) && status == that.status
+ && Objects.equals(opId, that.opId);
}
@Override
public int hashCode() {
- int result = id != null ? id.hashCode() : 0;
- result = 31 * result + (version != null ? version.hashCode() : 0);
- result = 31 * result + (status != null ? status.hashCode() : 0);
- return result;
+ return Objects.hash(id, version, status, opId);
}
@Override
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
index b8130502e8..189f2ebef4 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.deployunit.metastore;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.ignite.compute.version.Version;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.deployunit.FileDeployerService;
import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
/** Listener of deployment unit cluster status changes. */
public class ClusterEventCallbackImpl extends ClusterEventCallback {
@@ -64,35 +66,36 @@ public class ClusterEventCallbackImpl extends
ClusterEventCallback {
// Now the deployment unit can be removed from each target node and,
after it, remove corresponding status records.
deploymentUnitStore.getNodeStatus(nodeName, id,
version).thenAccept(nodeStatus -> {
if (nodeStatus != null && nodeStatus.status() == REMOVING) {
- undeploy(id, version);
+ undeploy(id, version, nodeStatus.opId());
}
});
}
- private void undeploy(String id, Version version) {
+ private void undeploy(String id, Version version, UUID opId) {
deployerService.undeploy(id, version).thenAccept(success -> {
if (success) {
- deploymentUnitStore.removeNodeStatus(nodeName, id,
version).thenAccept(successRemove -> {
+ deploymentUnitStore.removeNodeStatus(nodeName, id, version,
opId).thenAccept(successRemove -> {
if (successRemove) {
- removeClusterStatus(id, version);
+ removeClusterStatus(id, version, opId);
}
});
}
});
}
- private void removeClusterStatus(String id, Version version) {
+ private void removeClusterStatus(String id, Version version, UUID opId) {
cmgManager.logicalTopology().thenAccept(logicalTopology -> {
Set<String> logicalNodes = logicalTopology.nodes().stream()
.map(LogicalNode::name)
.collect(Collectors.toSet());
- deploymentUnitStore.getAllNodes(id, version).thenAccept(nodes -> {
- boolean emptyTopology = nodes.stream()
+ deploymentUnitStore.getAllNodeStatuses(id,
version).thenAccept(statuses -> {
+ boolean emptyTopology = statuses.stream()
+ .map(UnitNodeStatus::nodeId)
.filter(logicalNodes::contains)
.findAny()
.isEmpty();
if (emptyTopology) {
- deploymentUnitStore.removeClusterStatus(id, version);
+ deploymentUnitStore.removeClusterStatus(id, version, opId);
}
});
});
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
index 5f915341c2..ee07fb5edb 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
import java.util.Objects;
+import java.util.UUID;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
@@ -92,7 +93,7 @@ public class DeploymentUnitFailover {
Version version = unitNodeStatus.version();
if (unitClusterStatus == null) {
- undeploy(id, version);
+ undeploy(id, version, unitNodeStatus.opId());
return;
}
@@ -126,11 +127,11 @@ public class DeploymentUnitFailover {
}
}
- private void undeploy(String id, Version version) {
+ private void undeploy(String id, Version version, UUID opId) {
deployer.undeploy(id, version)
.thenAccept(success -> {
if (success) {
- deploymentUnitStore.removeNodeStatus(nodeName, id,
version);
+ deploymentUnitStore.removeNodeStatus(nodeName, id,
version, opId);
}
});
}
@@ -138,11 +139,12 @@ public class DeploymentUnitFailover {
private boolean checkAbaProblem(UnitClusterStatus clusterStatus,
UnitNodeStatus nodeStatus) {
String id = nodeStatus.id();
Version version = nodeStatus.version();
- if (clusterStatus.opId() != nodeStatus.opId()) {
+ UUID opId = nodeStatus.opId();
+ if (!Objects.equals(clusterStatus.opId(), opId)) {
if (nodeStatus.status() == DEPLOYED) {
- undeploy(id, version);
+ undeploy(id, version, opId);
} else {
- deploymentUnitStore.removeNodeStatus(nodeName, id, version);
+ deploymentUnitStore.removeNodeStatus(nodeName, id, version,
opId);
}
return true;
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
index 74e87beb00..13fd19f2ae 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.deployunit.metastore;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.deployunit.DeploymentStatus;
@@ -133,7 +134,7 @@ public interface DeploymentUnitStore {
String nodeId,
String id,
Version version,
- long opId,
+ UUID opId,
DeploymentStatus status);
/**
@@ -179,16 +180,18 @@ public interface DeploymentUnitStore {
*
* @param id Deployment unit identifier.
* @param version Deployment version identifier.
+ * @param opId Operation identifier.
* @return Future with {@code true} result if removed successfully.
*/
- CompletableFuture<Boolean> removeClusterStatus(String id, Version version);
+ CompletableFuture<Boolean> removeClusterStatus(String id, Version version,
UUID opId);
/**
* Removes node status.
*
* @param id Deployment unit identifier.
* @param version Deployment version identifier.
+ * @param opId Operation identifier.
* @return Future with {@code true} result if removed successfully.
*/
- CompletableFuture<Boolean> removeNodeStatus(String nodeId, String id,
Version version);
+ CompletableFuture<Boolean> removeNodeStatus(String nodeId, String id,
Version version, UUID opId);
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
index e454b37908..fb15c86285 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
@@ -25,11 +25,13 @@ import static
org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import java.util.List;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -43,7 +45,6 @@ import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
/**
* Implementation of {@link DeploymentUnitStore} based on {@link
MetaStorageManager}.
@@ -142,8 +143,8 @@ public class DeploymentUnitStoreImpl implements
DeploymentUnitStore {
@Override
public CompletableFuture<UnitClusterStatus> createClusterStatus(String id,
Version version, Set<String> nodes) {
ByteArray key =
ClusterStatusKey.builder().id(id).version(version).build().toByteArray();
- long revision = metaStorage.appliedRevision();
- UnitClusterStatus clusterStatus = new UnitClusterStatus(id, version,
UPLOADING, revision, nodes);
+ UUID operationId = UUID.randomUUID();
+ UnitClusterStatus clusterStatus = new UnitClusterStatus(id, version,
UPLOADING, operationId, nodes);
byte[] value = UnitClusterStatus.serialize(clusterStatus);
return metaStorage.invoke(notExists(key), put(key, value), noop())
@@ -155,7 +156,7 @@ public class DeploymentUnitStoreImpl implements
DeploymentUnitStore {
String nodeId,
String id,
Version version,
- long opId,
+ UUID opId,
DeploymentStatus status
) {
ByteArray key =
NodeStatusKey.builder().id(id).version(version).nodeId(nodeId).build().toByteArray();
@@ -208,17 +209,31 @@ public class DeploymentUnitStoreImpl implements
DeploymentUnitStore {
}
@Override
- public CompletableFuture<Boolean> removeClusterStatus(String id, Version
version) {
+ public CompletableFuture<Boolean> removeClusterStatus(String id, Version
version, UUID opId) {
ByteArray key =
ClusterStatusKey.builder().id(id).version(version).build().toByteArray();
- return metaStorage.invoke(exists(key), Operations.remove(key), noop());
+ return metaStorage.get(key).thenCompose(e -> {
+ UnitClusterStatus prev = UnitClusterStatus.deserialize(e.value());
+ if (!Objects.equals(prev.opId(), opId)) {
+ return falseCompletedFuture();
+ }
+
+ return metaStorage.invoke(revision(key).eq(e.revision()),
remove(key), noop());
+ });
}
@Override
- public CompletableFuture<Boolean> removeNodeStatus(String nodeId, String
id, Version version) {
+ public CompletableFuture<Boolean> removeNodeStatus(String nodeId, String
id, Version version, UUID opId) {
ByteArray key =
NodeStatusKey.builder().id(id).version(version).nodeId(nodeId).build().toByteArray();
- return metaStorage.invoke(exists(key), Operations.remove(key), noop());
+ return metaStorage.get(key).thenCompose(e -> {
+ UnitNodeStatus prev = UnitNodeStatus.deserialize(e.value());
+ if (!Objects.equals(prev.opId(), opId)) {
+ return falseCompletedFuture();
+ }
+
+ return metaStorage.invoke(revision(key).eq(e.revision()),
remove(key), noop());
+ });
}
/**
@@ -244,7 +259,7 @@ public class DeploymentUnitStoreImpl implements
DeploymentUnitStore {
}
return metaStorage.invoke(
- force ? exists(key) :
revision(key).le(e.revision()),
+ force ? exists(key) :
revision(key).eq(e.revision()),
put(key, newValue),
noop())
.thenCompose(finished -> {
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java
index 120fc60edd..6068ed80eb 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java
@@ -24,8 +24,13 @@ import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.compute.version.VersionParseException;
+import org.apache.ignite.internal.deployunit.DeploymentStatus;
import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.jetbrains.annotations.Nullable;
/**
* Serializer for {@link UnitStatus}.
@@ -106,7 +111,34 @@ public final class SerializeUtils {
return new String(Base64.getDecoder().decode(s), UTF_8);
}
- public static boolean checkElement(String[] arr, int index) {
+ static boolean checkElement(String[] arr, int index) {
return arr.length > index && arr[index] != null &&
!arr[index].isBlank();
}
+
+ @Nullable
+ static Version deserializeVersion(String[] values, int index) {
+ try {
+ return checkElement(values, index) ?
Version.parseVersion(decode(values[index])) : null;
+ } catch (VersionParseException e) {
+ return null;
+ }
+ }
+
+ @Nullable
+ static DeploymentStatus deserializeStatus(String[] values, int index) {
+ try {
+ return checkElement(values, index) ?
DeploymentStatus.valueOf(decode(values[index])) : null;
+ } catch (IllegalArgumentException e) {
+ return null;
+ }
+ }
+
+ @Nullable
+ static UUID deserializeUuid(String[] values, int index) {
+ try {
+ return checkElement(values, index) ?
UUID.fromString(decode(values[index])) : null;
+ } catch (IllegalArgumentException e) {
+ return null;
+ }
+ }
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
index f1b7b2728d..63939b7f1a 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
@@ -18,12 +18,19 @@
package org.apache.ignite.internal.deployunit.metastore.status;
import static
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.checkElement;
+import static
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.decode;
+import static
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.decodeAsSet;
+import static
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeStatus;
+import static
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeUuid;
+import static
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeVersion;
import java.util.Collections;
import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.deployunit.DeploymentStatus;
import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.jetbrains.annotations.Nullable;
/**
* Deployment unit cluster status.
@@ -44,7 +51,7 @@ public class UnitClusterStatus extends UnitStatus {
String id,
Version version,
DeploymentStatus status,
- long opId,
+ UUID opId,
Set<String> initialNodesToDeploy
) {
super(id, version, status, opId);
@@ -102,19 +109,18 @@ public class UnitClusterStatus extends UnitStatus {
* @param value Serialized deployment unit cluster status.
* @return Deserialized deployment unit cluster status.
*/
- public static UnitClusterStatus deserialize(byte[] value) {
+ public static UnitClusterStatus deserialize(byte @Nullable [] value) {
if (value == null || value.length == 0) {
- return new UnitClusterStatus(null, null, null, 0, Set.of());
+ return new UnitClusterStatus(null, null, null, null, Set.of());
}
String[] values = SerializeUtils.deserialize(value);
- String id = checkElement(values, 0) ? SerializeUtils.decode(values[0])
: null;
- Version version = checkElement(values, 1) ?
Version.parseVersion(SerializeUtils.decode(values[1])) : null;
- DeploymentStatus status = checkElement(values, 2) ?
DeploymentStatus.valueOf(SerializeUtils.decode(values[2])) : null;
- long opId = checkElement(values, 3) ?
Long.parseLong(SerializeUtils.decode(values[3])) : 0;
- Set<String> nodes = checkElement(values, 4) ?
SerializeUtils.decodeAsSet(values[4]) : Set.of();
-
+ String id = checkElement(values, 0) ? decode(values[0]) : null;
+ Version version = deserializeVersion(values, 1);
+ DeploymentStatus status = deserializeStatus(values, 2);
+ UUID opId = deserializeUuid(values, 3);
+ Set<String> nodes = checkElement(values, 4) ? decodeAsSet(values[4]) :
Set.of();
return new UnitClusterStatus(id, version, status, opId, nodes);
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
index 92dee426ab..0c32b788e3 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
@@ -18,11 +18,16 @@
package org.apache.ignite.internal.deployunit.metastore.status;
import static
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.checkElement;
+import static
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.decode;
+import static
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeStatus;
+import static
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeUuid;
+import static
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeVersion;
+import java.util.UUID;
import org.apache.ignite.compute.version.Version;
-import org.apache.ignite.compute.version.VersionParseException;
import org.apache.ignite.internal.deployunit.DeploymentStatus;
import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.jetbrains.annotations.Nullable;
/**
* Deployment unit node status.
@@ -39,7 +44,7 @@ public class UnitNodeStatus extends UnitStatus {
* @param opId Deployment unit operation identifier.
* @param nodeId Node consistent id.
*/
- public UnitNodeStatus(String id, Version version, DeploymentStatus status,
long opId, String nodeId) {
+ public UnitNodeStatus(String id, Version version, DeploymentStatus status,
UUID opId, String nodeId) {
super(id, version, status, opId);
this.nodeId = nodeId;
}
@@ -94,30 +99,18 @@ public class UnitNodeStatus extends UnitStatus {
* @param value Serialized deployment unit node status.
* @return Deserialized deployment unit node status.
*/
- public static UnitNodeStatus deserialize(byte[] value) {
+ public static UnitNodeStatus deserialize(byte @Nullable [] value) {
if (value == null || value.length == 0) {
- return new UnitNodeStatus(null, null, null, 0, null);
+ return new UnitNodeStatus(null, null, null, null, null);
}
String[] values = SerializeUtils.deserialize(value);
- String id = checkElement(values, 0) ? SerializeUtils.decode(values[0])
: null;
- Version version;
- try {
- version = checkElement(values, 1) ?
Version.parseVersion(SerializeUtils.decode(values[1])) : null;
- } catch (VersionParseException e) {
- version = null;
- }
- DeploymentStatus status;
- try {
- status = checkElement(values, 2) ?
DeploymentStatus.valueOf(SerializeUtils.decode(values[2])) : null;
- } catch (IllegalArgumentException e) {
- status = null;
- }
-
- long opId = checkElement(values, 3) ?
Long.parseLong(SerializeUtils.decode(values[3])) : 0;
-
- String nodeId = checkElement(values, 4) ?
SerializeUtils.decode(values[4]) : null;
+ String id = checkElement(values, 0) ? decode(values[0]) : null;
+ Version version = deserializeVersion(values, 1);
+ DeploymentStatus status = deserializeStatus(values, 2);
+ UUID opId = deserializeUuid(values, 3);
+ String nodeId = checkElement(values, 4) ? decode(values[4]) : null;
return new UnitNodeStatus(id, version, status, opId, nodeId);
}
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java
index 27a4b38a82..5b4609ac53 100644
---
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java
@@ -24,6 +24,7 @@ import static
org.junit.jupiter.params.provider.Arguments.arguments;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.deployunit.DeploymentStatus;
import org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils;
@@ -39,19 +40,19 @@ import org.junit.jupiter.params.provider.MethodSource;
public class UnitStatusesSerializerTest {
private static List<Arguments> nodeStatusProvider() {
return List.of(
- arguments(null, null, null, 0, null),
- arguments("id", null, null, 0, null),
- arguments("id", Version.LATEST, null, 0, null),
- arguments("id", Version.LATEST, UPLOADING, 10, "node1")
+ arguments(null, null, null, null, null),
+ arguments("id", null, null, UUID.randomUUID(), null),
+ arguments("id", Version.LATEST, null, UUID.randomUUID(), null),
+ arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(),
"node1")
);
}
private static List<Arguments> clusterStatusProvider() {
return List.of(
- arguments("id", Version.LATEST, UPLOADING, 0, Set.of()),
- arguments("id", Version.LATEST, UPLOADING, 1, Set.of("node1")),
- arguments("id", Version.LATEST, UPLOADING, 1, Set.of("node1",
"node2")),
- arguments("id", Version.LATEST, UPLOADING, 1, Set.of("node1",
"node2", "node3"))
+ arguments("id", Version.LATEST, UPLOADING, null, Set.of()),
+ arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(),
Set.of("node1")),
+ arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(),
Set.of("node1", "node2")),
+ arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(),
Set.of("node1", "node2", "node3"))
);
}
@@ -61,7 +62,7 @@ public class UnitStatusesSerializerTest {
String id,
Version version,
DeploymentStatus status,
- long opId,
+ UUID opId,
String nodeId
) {
UnitNodeStatus nodeStatus = new UnitNodeStatus(id, version, status,
opId, nodeId);
@@ -77,7 +78,7 @@ public class UnitStatusesSerializerTest {
String id,
Version version,
DeploymentStatus status,
- long opId,
+ UUID opId,
Set<String> consistentIdLocation
) {
UnitClusterStatus nodeStatus = new UnitClusterStatus(id, version,
status, opId, consistentIdLocation);
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
index 22c43fc572..8ad55e40bc 100644
---
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallback;
@@ -129,7 +130,7 @@ public class DeploymentUnitStoreImplTest extends
BaseIgniteAbstractTest {
assertThat(clusterStatusFuture, willCompleteSuccessfully());
UnitClusterStatus clusterStatus = clusterStatusFuture.get();
- long opId = clusterStatus.opId();
+ UUID opId = clusterStatus.opId();
assertThat(metastore.getClusterStatus(id, version),
willBe(new UnitClusterStatus(id, version, UPLOADING, opId,
Set.of())));
@@ -138,11 +139,59 @@ public class DeploymentUnitStoreImplTest extends
BaseIgniteAbstractTest {
assertThat(metastore.getClusterStatus(id, version),
willBe(new UnitClusterStatus(id, version, DEPLOYED, opId,
Set.of())));
- assertThat(metastore.removeClusterStatus(id, version), willBe(true));
+ assertThat(metastore.removeClusterStatus(id, version, opId),
willBe(true));
assertThat(metastore.getClusterStatus(id, version),
willBe(nullValue()));
}
+ @Test
+ void clusterStatusAba() {
+ String id = "id1";
+ Version version = Version.parseVersion("1.1.1");
+
+ CompletableFuture<UnitClusterStatus> clusterStatusFuture1 =
metastore.createClusterStatus(id, version, Set.of());
+ assertThat(clusterStatusFuture1, willCompleteSuccessfully());
+
+ UUID opId1 = clusterStatusFuture1.join().opId();
+
+ assertThat(metastore.removeClusterStatus(id, version, opId1),
willBe(true));
+
+ // Create new cluster status with the same id and version
+ CompletableFuture<UnitClusterStatus> clusterStatusFuture2 =
metastore.createClusterStatus(id, version, Set.of());
+ assertThat(clusterStatusFuture2, willCompleteSuccessfully());
+
+ UUID opId2 = clusterStatusFuture2.join().opId();
+
+ // Remove with the initial operation ID should fail
+ assertThat(metastore.removeClusterStatus(id, version, opId1),
willBe(false));
+
+ // Remove with the correct operation ID should succeed
+ assertThat(metastore.removeClusterStatus(id, version, opId2),
willBe(true));
+ }
+
+ @Test
+ void nodeStatusAba() {
+ String id = "id1";
+ Version version = Version.parseVersion("1.1.1");
+ String node1 = "node1";
+
+ UUID opId1 = UUID.randomUUID();
+ UUID opId2 = UUID.randomUUID();
+
+ assertThat(metastore.createNodeStatus(node1, id, version, opId1,
UPLOADING), willBe(true));
+
+ assertThat(metastore.removeNodeStatus(node1, id, version, opId1),
willBe(true));
+
+ // Create new node status with the same id and version
+ assertThat(metastore.createNodeStatus(node1, id, version, opId2,
UPLOADING), willBe(true));
+
+ // Remove with the initial operation ID should fail
+ assertThat(metastore.removeNodeStatus(node1, id, version, opId1),
willBe(false));
+
+ // Remove with the correct operation ID should succeed
+ assertThat(metastore.removeNodeStatus(node1, id, version, opId2),
willBe(true));
+ }
+
@Test
public void nodeStatusTest() throws Exception {
String id = "id2";
@@ -156,7 +205,7 @@ public class DeploymentUnitStoreImplTest extends
BaseIgniteAbstractTest {
assertThat(clusterStatusFuture, willCompleteSuccessfully());
UnitClusterStatus clusterStatus = clusterStatusFuture.get();
- long opId = clusterStatus.opId();
+ UUID opId = clusterStatus.opId();
assertThat(metastore.getClusterStatus(id, version),
willBe(new UnitClusterStatus(id, version, UPLOADING, opId,
Set.of(node1, node2, node3))));
@@ -183,11 +232,11 @@ public class DeploymentUnitStoreImplTest extends
BaseIgniteAbstractTest {
willBe(contains((new UnitClusterStatus(id, version, DEPLOYED,
opId, Set.of(node1, node2, node3)))))
);
- assertThat(metastore.removeClusterStatus(id, version), willBe(true));
+ assertThat(metastore.removeClusterStatus(id, version, opId),
willBe(true));
assertThat(metastore.getNodeStatus(node1, id, version),
willBe(new UnitNodeStatus(id, version, DEPLOYED, opId,
node1)));
- assertThat(metastore.removeNodeStatus(node1, id, version),
willBe(true));
+ assertThat(metastore.removeNodeStatus(node1, id, version, opId),
willBe(true));
assertThat(metastore.getNodeStatus(node1, id, version),
willBe(nullValue()));
}
@@ -197,17 +246,18 @@ public class DeploymentUnitStoreImplTest extends
BaseIgniteAbstractTest {
Version version = Version.parseVersion("1.1.1");
String node1 = LOCAL_NODE;
- assertThat(metastore.createNodeStatus(node1, id, version, 0,
UPLOADING), willBe(true));
+ UUID opId = UUID.randomUUID();
+ assertThat(metastore.createNodeStatus(node1, id, version, opId,
UPLOADING), willBe(true));
assertThat(metastore.updateNodeStatus(node1, id, version, DEPLOYED),
willBe(true));
assertThat(metastore.updateNodeStatus(node1, id, version, OBSOLETE),
willBe(true));
assertThat(metastore.updateNodeStatus(node1, id, version, REMOVING),
willBe(true));
await().untilAsserted(() ->
assertThat(nodeHistory, containsInAnyOrder(
- new UnitNodeStatus(id, version, UPLOADING, 0, node1),
- new UnitNodeStatus(id, version, DEPLOYED, 0, node1),
- new UnitNodeStatus(id, version, OBSOLETE, 0, node1),
- new UnitNodeStatus(id, version, REMOVING, 0, node1)
+ new UnitNodeStatus(id, version, UPLOADING, opId,
node1),
+ new UnitNodeStatus(id, version, DEPLOYED, opId, node1),
+ new UnitNodeStatus(id, version, OBSOLETE, opId, node1),
+ new UnitNodeStatus(id, version, REMOVING, opId, node1)
)));
}
@@ -220,7 +270,7 @@ public class DeploymentUnitStoreImplTest extends
BaseIgniteAbstractTest {
assertThat(clusterStatusFuture, willCompleteSuccessfully());
UnitClusterStatus clusterStatus = clusterStatusFuture.get();
- long opId = clusterStatus.opId();
+ UUID opId = clusterStatus.opId();
assertThat(metastore.updateClusterStatus(id, version, DEPLOYED),
willBe(true));
assertThat(metastore.updateClusterStatus(id, version, OBSOLETE),
willBe(true));
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java
index a763fa5ed3..28d1da95a9 100644
---
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java
@@ -18,6 +18,10 @@
package org.apache.ignite.internal.deployunit;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -26,6 +30,7 @@ import static org.mockito.Mockito.doReturn;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.compute.version.Version;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -66,11 +71,11 @@ class DeploymentManagerImplTest extends
BaseIgniteAbstractTest {
@Test
public void detectLatestDeployedVersion() {
List<UnitClusterStatus> unitClusterStatuses = List.of(
- new UnitClusterStatus("unit", Version.parseVersion("1.0.0"),
DeploymentStatus.DEPLOYED, 0, Set.of("node1", "node2")),
- new UnitClusterStatus("unit", Version.parseVersion("1.0.1"),
DeploymentStatus.OBSOLETE, 0, Set.of("node1", "node2")),
- new UnitClusterStatus("unit", Version.parseVersion("1.0.2"),
DeploymentStatus.DEPLOYED, 0, Set.of("node1", "node2")),
- new UnitClusterStatus("unit", Version.parseVersion("1.0.3"),
DeploymentStatus.UPLOADING, 0, Set.of("node1", "node2")),
- new UnitClusterStatus("unit", Version.parseVersion("1.0.4"),
DeploymentStatus.REMOVING, 0, Set.of("node1", "node2"))
+ new UnitClusterStatus("unit", Version.parseVersion("1.0.0"),
DEPLOYED, UUID.randomUUID(), Set.of("node1", "node2")),
+ new UnitClusterStatus("unit", Version.parseVersion("1.0.1"),
OBSOLETE, UUID.randomUUID(), Set.of("node1", "node2")),
+ new UnitClusterStatus("unit", Version.parseVersion("1.0.2"),
DEPLOYED, UUID.randomUUID(), Set.of("node1", "node2")),
+ new UnitClusterStatus("unit", Version.parseVersion("1.0.3"),
UPLOADING, UUID.randomUUID(), Set.of("node1", "node2")),
+ new UnitClusterStatus("unit", Version.parseVersion("1.0.4"),
REMOVING, UUID.randomUUID(), Set.of("node1", "node2"))
);
doReturn(completedFuture(unitClusterStatuses)).when(deploymentUnitStore).getClusterStatuses("unit");