This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5d3a9ad69e IGNITE-19708 Check refcounter of unit before undeploy
(#2219)
5d3a9ad69e is described below
commit 5d3a9ad69e356646d7c4b8e7c8315b0ab98b3e61
Author: Ivan Gagarkin <[email protected]>
AuthorDate: Tue Jun 27 19:26:54 2023 +0400
IGNITE-19708 Check refcounter of unit before undeploy (#2219)
---
.../internal/deployunit/DefaultNodeCallback.java | 9 +-
.../internal/deployunit/DeploymentManagerImpl.java | 32 ++++-
.../deployunit/DeploymentUnitAccessor.java | 8 +-
.../deployunit/DeploymentUnitAccessorImpl.java | 39 ++++--
.../deployunit/DeploymentUnitAcquiredWaiter.java | 103 ++++++++++++++
.../deployunit/DeploymentUnitAccessorImplTest.java | 53 +++++++
.../DeploymentUnitAcquiredWaiterTest.java | 155 +++++++++++++++++++++
.../compute/ClassLoaderExceptionsMapper.java | 8 +-
modules/compute/src/test/README.md | 6 +-
.../compute/ClassLoaderExceptionsMapperTest.java | 4 +-
...OT-src.zip => ignite-jobs-1.0-SNAPSHOT-src.zip} | Bin
.../unit1/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar | Bin 0 -> 1749 bytes
.../units/unit1/1.0.0/unit1-1.0-SNAPSHOT.jar | Bin 1749 -> 0 bytes
.../unit1/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar | Bin 0 -> 1724 bytes
.../units/unit1/2.0.0/unit2-1.0-SNAPSHOT.jar | Bin 1724 -> 0 bytes
.../unit1/3.0.1/ignite-ut-job1-1.0-SNAPSHOT.jar | Bin 0 -> 1749 bytes
.../unit1/3.0.1/ignite-ut-job2-1.0-SNAPSHOT.jar | Bin 0 -> 1724 bytes
.../units/unit1/3.0.1/unit1-1.0-SNAPSHOT.jar | Bin 1749 -> 0 bytes
.../units/unit1/3.0.1/unit2-1.0-SNAPSHOT.jar | Bin 1724 -> 0 bytes
.../unit1/3.0.2/ignite-ut-job1-1.0-SNAPSHOT.jar | Bin 0 -> 1749 bytes
.../3.0.2/subdir/ignite-ut-job2-1.0-SNAPSHOT.jar | Bin 0 -> 1724 bytes
.../unit1/3.0.2/subdir/unit2-1.0-SNAPSHOT.jar | Bin 1724 -> 0 bytes
.../units/unit1/3.0.2/unit1-1.0-SNAPSHOT.jar | Bin 1749 -> 0 bytes
.../unit2/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar | Bin 0 -> 1749 bytes
.../units/unit2/1.0.0/unit1-1.0-SNAPSHOT.jar | Bin 1749 -> 0 bytes
.../unit2/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar | Bin 0 -> 1724 bytes
.../units/unit2/2.0.0/unit2-1.0-SNAPSHOT.jar | Bin 1724 -> 0 bytes
.../cpp/tests/client-test/compute_test.cpp | 8 +-
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 4 +-
.../internal/compute/ItComputeTestStandalone.java | 49 ++++++-
.../runner/app/client/ItThinClientComputeTest.java | 4 +-
.../units/ignite-it-jobs-1.0-SNAPSHOT.jar | Bin 0 -> 5528 bytes
...OT-src.zip => ignite-jobs-1.0-SNAPSHOT-src.zip} | Bin
.../resources/units/ignite-jobs-1.0-SNAPSHOT.jar | Bin 4773 -> 0 bytes
.../units/ignite-ut-job1-1.0-SNAPSHOT.jar | Bin 0 -> 1749 bytes
.../units/ignite-ut-job2-1.0-SNAPSHOT.jar | Bin 0 -> 1724 bytes
.../resources/units/unit1-1.0-SNAPSHOT.jar | Bin 1749 -> 0 bytes
.../resources/units/unit2-1.0-SNAPSHOT.jar | Bin 1724 -> 0 bytes
38 files changed, 444 insertions(+), 38 deletions(-)
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
index a84f1e858a..d657227a5e 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.version.Version;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
@@ -42,6 +43,8 @@ public class DefaultNodeCallback extends NodeEventCallback {
private final FileDeployerService deployer;
+ private final DeploymentUnitAcquiredWaiter undeployer;
+
private final DownloadTracker tracker;
private final ClusterManagementGroupManager cmgManager;
@@ -54,6 +57,7 @@ public class DefaultNodeCallback extends NodeEventCallback {
* @param deploymentUnitStore Deployment units store.
* @param messaging Deployment messaging service.
* @param deployer Deployment unit file system service.
+ * @param undeployer Deployment unit undeployer.
* @param cmgManager Cluster management group manager.
* @param nodeName Node consistent ID.
*/
@@ -61,6 +65,7 @@ public class DefaultNodeCallback extends NodeEventCallback {
DeploymentUnitStore deploymentUnitStore,
DeployMessagingService messaging,
FileDeployerService deployer,
+ DeploymentUnitAcquiredWaiter undeployer,
DownloadTracker tracker,
ClusterManagementGroupManager cmgManager,
String nodeName
@@ -68,6 +73,7 @@ public class DefaultNodeCallback extends NodeEventCallback {
this.deploymentUnitStore = deploymentUnitStore;
this.messaging = messaging;
this.deployer = deployer;
+ this.undeployer = undeployer;
this.tracker = tracker;
this.cmgManager = cmgManager;
this.nodeName = nodeName;
@@ -102,8 +108,7 @@ public class DefaultNodeCallback extends NodeEventCallback {
@Override
public void onObsolete(String id, Version version, List<UnitNodeStatus>
holders) {
- //TODO: IGNITE-19708
- deploymentUnitStore.updateNodeStatus(nodeName, id, version, REMOVING);
+ undeployer.submitToAcquireRelease(new DeploymentUnit(id, version));
}
@Override
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 9e7fd0a99f..ba657e6104 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -22,15 +22,18 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.ignite.compute.version.Version;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -60,6 +63,11 @@ import org.jetbrains.annotations.Nullable;
public class DeploymentManagerImpl implements IgniteDeployment {
private static final IgniteLogger LOG =
Loggers.forClass(DeploymentManagerImpl.class);
+ /**
+ * Delay for undeployer.
+ */
+ private static final Duration UNDEPLOYER_DELAY = Duration.ofSeconds(5);
+
/**
* Node working directory.
*/
@@ -105,6 +113,8 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
*/
private final DeploymentUnitAccessor deploymentUnitAccessor;
+ private final DeploymentUnitAcquiredWaiter undeployer;
+
private final String nodeName;
private final NodeEventCallback nodeStatusCallback;
@@ -138,14 +148,26 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
this.configuration = configuration;
this.cmgManager = cmgManager;
this.workDir = workDir;
+ this.nodeName = nodeName;
tracker = new DownloadTracker();
deployer = new FileDeployerService();
- messaging = new DeployMessagingService(clusterService, cmgManager,
deployer, tracker);
deploymentUnitAccessor = new DeploymentUnitAccessorImpl(deployer);
+ undeployer = new DeploymentUnitAcquiredWaiter(
+ nodeName,
+ deploymentUnitAccessor,
+ unit -> deploymentUnitStore.updateNodeStatus(nodeName,
unit.name(), unit.version(), REMOVING)
+ );
+ messaging = new DeployMessagingService(clusterService, cmgManager,
deployer, tracker);
- this.nodeName = nodeName;
-
- nodeStatusCallback = new DefaultNodeCallback(deploymentUnitStore,
messaging, deployer, tracker, cmgManager, nodeName);
+ nodeStatusCallback = new DefaultNodeCallback(
+ deploymentUnitStore,
+ messaging,
+ deployer,
+ undeployer,
+ tracker,
+ cmgManager,
+ nodeName
+ );
nodeStatusWatchListener = new
NodeStatusWatchListener(deploymentUnitStore, nodeName, nodeStatusCallback);
clusterEventCallback = new
ClusterEventCallbackImpl(deploymentUnitStore, deployer, cmgManager, nodeName);
@@ -373,6 +395,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
deploymentUnitStore.registerClusterStatusListener(clusterStatusWatchListener);
messaging.subscribe();
failover.registerTopologyChangeCallback(nodeStatusCallback,
clusterEventCallback);
+ undeployer.start(UNDEPLOYER_DELAY.getSeconds(), TimeUnit.SECONDS);
}
@Override
@@ -381,6 +404,7 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
tracker.cancelAll();
deploymentUnitStore.unregisterNodeStatusListener(nodeStatusWatchListener);
deploymentUnitStore.unregisterClusterStatusListener(clusterStatusWatchListener);
+ undeployer.stop();
}
private static void checkId(String id) {
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessor.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessor.java
index feae04bc88..cf384d391a 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessor.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.deployunit;
+import java.util.function.Consumer;
import org.apache.ignite.compute.DeploymentUnit;
/**
@@ -33,10 +34,11 @@ public interface DeploymentUnitAccessor {
DisposableDeploymentUnit acquire(DeploymentUnit unit);
/**
- * Checks if the deployment unit is acquired.
+ * Executes the consumer if the deployment unit is not acquired.
*
* @param unit Deployment unit.
- * @return {@code true} if the deployment unit is acquired.
+ * @param consumer Consumer.
+ * @return {@code true} if the consumer was executed.
*/
- boolean isAcquired(DeploymentUnit unit);
+ boolean computeIfNotAcquired(DeploymentUnit unit, Consumer<DeploymentUnit>
consumer);
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImpl.java
index 0078c84e94..368bcce408 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImpl.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.deployunit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Function;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.internal.util.RefCountedObjectPool;
@@ -26,6 +30,8 @@ import org.apache.ignite.internal.util.RefCountedObjectPool;
public class DeploymentUnitAccessorImpl implements DeploymentUnitAccessor {
private final RefCountedObjectPool<DeploymentUnit,
DisposableDeploymentUnit> pool = new RefCountedObjectPool<>();
+ private final RefCountedObjectPool<DeploymentUnit, Lock> locks = new
RefCountedObjectPool<>();
+
private final FileDeployerService deployer;
public DeploymentUnitAccessorImpl(FileDeployerService deployer) {
@@ -37,19 +43,36 @@ public class DeploymentUnitAccessorImpl implements
DeploymentUnitAccessor {
*/
@Override
public DisposableDeploymentUnit acquire(DeploymentUnit unit) {
- return pool.acquire(unit, ignored -> new DisposableDeploymentUnit(
- unit,
- deployer.unitPath(unit.name(), unit.version(), true),
- () -> pool.release(unit)
- )
- );
+ return executeWithLock(unit, it -> pool.acquire(it, ignored -> new
DisposableDeploymentUnit(
+ it,
+ deployer.unitPath(it.name(), it.version(), true),
+ () -> pool.release(it)
+ )));
}
/**
* {@inheritDoc}
*/
@Override
- public boolean isAcquired(DeploymentUnit unit) {
- return pool.isAcquired(unit);
+ public boolean computeIfNotAcquired(DeploymentUnit unit,
Consumer<DeploymentUnit> consumer) {
+ return executeWithLock(unit, it -> {
+ if (pool.isAcquired(it)) {
+ return false;
+ } else {
+ consumer.accept(it);
+ return true;
+ }
+ });
+ }
+
+ private <O> O executeWithLock(DeploymentUnit unit,
Function<DeploymentUnit, O> function) {
+ Lock lock = locks.acquire(unit, ignored -> new ReentrantLock());
+ lock.lock();
+ try {
+ return function.apply(unit);
+ } finally {
+ lock.unlock();
+ locks.release(unit);
+ }
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiter.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiter.java
new file mode 100644
index 0000000000..b754275eea
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+
+/**
+ * Executes action on the deployment unit if it is not acquired. Otherwise,
puts it to the queue.
+ */
+class DeploymentUnitAcquiredWaiter {
+ private static final IgniteLogger LOG =
Loggers.forClass(DeploymentUnitAcquiredWaiter.class);
+
+ /** Deployment units to undeploy. */
+ private final Queue<DeploymentUnit> queue = new ConcurrentLinkedDeque<>();
+
+ /** Deployment unit accessor. */
+ private final DeploymentUnitAccessor deploymentUnitAccessor;
+
+ /** Executor. */
+ private final ScheduledExecutorService executor;
+
+ /** Action. */
+ private final Consumer<DeploymentUnit> action;
+
+ /**
+ * Creates processor.
+ *
+ * @param nodeName node name.
+ * @param deploymentUnitAccessor deployment unit accessor.
+ * @param action action.
+ */
+ DeploymentUnitAcquiredWaiter(
+ String nodeName,
+ DeploymentUnitAccessor deploymentUnitAccessor,
+ Consumer<DeploymentUnit> action) {
+ this.deploymentUnitAccessor = deploymentUnitAccessor;
+ this.executor = Executors.newScheduledThreadPool(
+ 1, NamedThreadFactory.create(nodeName,
"deployment-unit-acquired-waiter", LOG));
+ this.action = action;
+ }
+
+ /**
+ * Starts the processor.
+ *
+ * @param delay delay between undeploy attempts.
+ * @param unit time unit of the delay.
+ */
+ public void start(long delay, TimeUnit unit) {
+ executor.scheduleWithFixedDelay(this::processQueue, 0, delay, unit);
+ }
+
+ /**
+ * Stops the processor.
+ */
+ public void stop() {
+ executor.shutdown();
+ }
+
+ /**
+ * Processes all deployment units in the queue.
+ */
+ private void processQueue() {
+ int size = queue.size();
+ for (int i = 0; i < size; i++) {
+ submitToAcquireRelease(queue.remove());
+ }
+ }
+
+ /**
+ * Executes the action on the deployment unit if it is not acquired.
Otherwise, puts it to the queue.
+ *
+ * @param unit deployment unit to undeploy.
+ */
+ public void submitToAcquireRelease(DeploymentUnit unit) {
+ if (!deploymentUnitAccessor.computeIfNotAcquired(unit, action)) {
+ queue.offer(unit);
+ }
+ }
+}
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImplTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImplTest.java
new file mode 100644
index 0000000000..63d9c0b3e1
--- /dev/null
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAccessorImplTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.compute.DeploymentUnit;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DeploymentUnitAccessorImplTest {
+
+ @Mock
+ private FileDeployerService deployerService;
+
+ @InjectMocks
+ private DeploymentUnitAccessorImpl deploymentUnitAccessor;
+
+ @Test
+ void computeIfNotAcquired() {
+ DeploymentUnit unit = new DeploymentUnit("unit", "1.0.0");
+ DisposableDeploymentUnit disposableDeploymentUnit1 =
deploymentUnitAccessor.acquire(unit);
+ DisposableDeploymentUnit disposableDeploymentUnit2 =
deploymentUnitAccessor.acquire(unit);
+
+ assertFalse(deploymentUnitAccessor.computeIfNotAcquired(unit, ignored
-> {}));
+
+ disposableDeploymentUnit1.release();
+ assertFalse(deploymentUnitAccessor.computeIfNotAcquired(unit, ignored
-> {}));
+
+ disposableDeploymentUnit2.release();
+ assertTrue(deploymentUnitAccessor.computeIfNotAcquired(unit, ignored
-> {}));
+ }
+}
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiterTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiterTest.java
new file mode 100644
index 0000000000..6622a59d4d
--- /dev/null
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentUnitAcquiredWaiterTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static java.util.function.Predicate.isEqual;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.verify;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DeploymentUnitAcquiredWaiterTest {
+ private static final int DELAY_IN_MILLIS = 500;
+
+ private final Set<DeploymentUnit> removingUnits = new
CopyOnWriteArraySet<>();
+
+ @Mock
+ private FileDeployerService deployerService;
+
+ @Spy
+ @InjectMocks
+ private DeploymentUnitAccessorImpl deploymentUnitAccessor;
+
+ private DeploymentUnitAcquiredWaiter undeployer;
+
+ @BeforeEach
+ void setUp() {
+ undeployer = new DeploymentUnitAcquiredWaiter(
+ "testNode",
+ deploymentUnitAccessor,
+ removingUnits::add
+ );
+ undeployer.start(DELAY_IN_MILLIS, TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ void undeployNotAcquiredUnits() {
+ DeploymentUnit unit1 = new DeploymentUnit("unit1", "1.0.0");
+ DeploymentUnit unit2 = new DeploymentUnit("unit2", "1.0.0");
+ DeploymentUnit unit3 = new DeploymentUnit("unit3", "1.0.0");
+
+ undeployer.submitToAcquireRelease(unit1);
+ undeployer.submitToAcquireRelease(unit2);
+ undeployer.submitToAcquireRelease(unit3);
+
+ // check all units are removed instantly.
+ assertThat(removingUnits, contains(unit1, unit2, unit3));
+ }
+
+ @Test
+ void undeployAcquiredUnits() {
+ DeploymentUnit unit1 = new DeploymentUnit("unit1", "1.0.0");
+ DeploymentUnit unit2 = new DeploymentUnit("unit2", "1.0.0");
+ DeploymentUnit unit3 = new DeploymentUnit("unit3", "1.0.0");
+
+ // all units are acquired and will not be released in the future.
+ deploymentUnitAccessor.acquire(unit1);
+ deploymentUnitAccessor.acquire(unit2);
+ deploymentUnitAccessor.acquire(unit3);
+
+ undeployer.submitToAcquireRelease(unit1);
+ undeployer.submitToAcquireRelease(unit2);
+ undeployer.submitToAcquireRelease(unit3);
+
+ // check all units are still not removed.
+ await().during(DELAY_IN_MILLIS * 5, TimeUnit.MILLISECONDS).until(
+ () -> removingUnits.contains(unit1) &&
removingUnits.contains(unit2) && removingUnits.contains(unit3),
+ isEqual(false)
+ );
+ }
+
+ @Test
+ void undeployReleasedUnits() {
+ DeploymentUnit unit1 = new DeploymentUnit("unit1", "1.0.0");
+ DeploymentUnit unit2 = new DeploymentUnit("unit2", "1.0.0");
+ DeploymentUnit unit3 = new DeploymentUnit("unit3", "1.0.0");
+
+ // unit1 and unit3 will be released in the future.
+ DisposableDeploymentUnit deploymentUnit1 =
deploymentUnitAccessor.acquire(unit1);
+ DisposableDeploymentUnit deploymentUnit2 =
deploymentUnitAccessor.acquire(unit2);
+ DisposableDeploymentUnit deploymentUnit3 =
deploymentUnitAccessor.acquire(unit3);
+
+ undeployer.submitToAcquireRelease(unit1);
+ undeployer.submitToAcquireRelease(unit2);
+ undeployer.submitToAcquireRelease(unit3);
+
+ verify(deploymentUnitAccessor,
atLeastOnce()).computeIfNotAcquired(eq(unit1), any());
+ verify(deploymentUnitAccessor,
atLeastOnce()).computeIfNotAcquired(eq(unit2), any());
+ verify(deploymentUnitAccessor,
atLeastOnce()).computeIfNotAcquired(eq(unit3), any());
+
+ assertThat(removingUnits, emptyIterable());
+
+ deploymentUnit1.release();
+ deploymentUnit3.release();
+
+ // check unit1 and unit3 were removed.
+ await().timeout(DELAY_IN_MILLIS * 4, TimeUnit.MILLISECONDS).until(()
-> removingUnits.contains(unit1));
+ await().timeout(DELAY_IN_MILLIS * 4, TimeUnit.MILLISECONDS).until(()
-> removingUnits.contains(unit3));
+
+ // check unit2 is still not removed.
+ await().during(DELAY_IN_MILLIS * 4, TimeUnit.MILLISECONDS)
+ .until(() -> removingUnits.contains(unit2), isEqual(false));
+ }
+
+ @Test
+ void notInfinityLoop() {
+ DeploymentUnit unit1 = new DeploymentUnit("unit1", "1.0.0");
+
+ deploymentUnitAccessor.acquire(unit1);
+
+ undeployer.submitToAcquireRelease(unit1);
+
+ // check delay between attempts to undeploy the unit.
+ verify(deploymentUnitAccessor, after(DELAY_IN_MILLIS * 5).atMost(6))
+ .computeIfNotAcquired(eq(unit1), any());
+ }
+
+ @AfterEach
+ void tearDown() {
+ undeployer.stop();
+ }
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
index af861c49f0..182dcddc5d 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapper.java
@@ -24,13 +24,13 @@ import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitNotFoundExc
import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitUnavailableException;
class ClassLoaderExceptionsMapper {
- // <class_fqdn>. Deployment unit <deployment_unit_id_and ver> doesn’t
exist.
- private static final String DEPLOYMENT_UNIT_DOES_NOT_EXIST_MSG = "%s.
Deployment unit %s:%s doesn’t exist";
+ // <class_fqdn>. Deployment unit <deployment_unit_id_and ver> doesn't
exist.
+ private static final String DEPLOYMENT_UNIT_DOES_NOT_EXIST_MSG = "%s.
Deployment unit %s:%s doesn't exist";
- // <class_fqdn>. Deployment unit <deployment_unit_id> can’t be used:
+ // <class_fqdn>. Deployment unit <deployment_unit_id> can't be used:
// [clusterStatus = <clusterDURecord.status>, nodeStatus =
<nodeDURecord.status>].
- private static final String DEPLOYMENT_UNIT_NOT_AVAILABLE_MSG = "%s.
Deployment unit %s:%s can’t be used: "
+ private static final String DEPLOYMENT_UNIT_NOT_AVAILABLE_MSG = "%s.
Deployment unit %s:%s can't be used: "
+ "[clusterStatus = %s, nodeStatus = %s]";
static CompletableFuture<JobContext> mapClassLoaderExceptions(
diff --git a/modules/compute/src/test/README.md
b/modules/compute/src/test/README.md
index 1ca0cd592a..56eda0dcfb 100644
--- a/modules/compute/src/test/README.md
+++ b/modules/compute/src/test/README.md
@@ -1,15 +1,15 @@
# ignite-compute
## Unit tests
-[apache-ignite-1.0-SNAPSHOT-src.zip](resources%2Funits%2Fapache-ignite-1.0-SNAPSHOT-src.zip)
contains a zip archive with a
+[ignite-jobs-1.0-SNAPSHOT-src.zip](resources%2Funits%2Fignite-jobs-1.0-SNAPSHOT-src.zip)
contains a zip archive with a
test project which was used to create jars for
`org.apache.ignite.internal.compute.loader.JobClassLoaderFactoryTest` tests.
-[unit1-1.0-SNAPSHOT.jar](resources%2Funits%2Funit2%2F1.0.0%2Funit1-1.0-SNAPSHOT.jar)
contains two classes
+[ignite-ut-job1-1.0-SNAPSHOT.jar](resources%2Funits%2Funit1%2F1.0.0%2Fignite-ut-job1-1.0-SNAPSHOT.jar)
contains two classes
which are used in tests:
* `org.apache.ignite.internal.compute.unit1.Unit1` - `extends
org.apache.ignite.compute.ComputeJob` and returns 1 as Integer.
* `org.my.job.compute.unit.Job1Utility`
-[unit2-1.0-SNAPSHOT.jar](resources%2Funits%2Funit2%2F2.0.0%2Funit2-1.0-SNAPSHOT.jar)
contains two classes
+[ignite-ut-job2-1.0-SNAPSHOT.jar](resources%2Funits%2Funit1%2F2.0.0%2Fignite-ut-job2-1.0-SNAPSHOT.jar)
contains two classes
which are used in tests:
* `org.apache.ignite.internal.compute.unit1.Unit2` - extends
`org.apache.ignite.compute.ComputeJob` and returns "Hello World!" as String.
* `org.my.job.compute.unit.Job2Utility`
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapperTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapperTest.java
index b36269235f..53d2a94c3c 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapperTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ClassLoaderExceptionsMapperTest.java
@@ -38,7 +38,7 @@ class ClassLoaderExceptionsMapperTest {
mapClassLoaderExceptions(failedFuture(toBeThrown),
"com.example.Main"),
willThrow(
ClassNotFoundException.class,
- "com.example.Main. Deployment unit id:1.0.0 doesn’t
exist"
+ "com.example.Main. Deployment unit id:1.0.0 doesn't
exist"
)
);
}
@@ -56,7 +56,7 @@ class ClassLoaderExceptionsMapperTest {
mapClassLoaderExceptions(failedFuture(toBeThrown),
"com.example.Main"),
willThrow(
ClassNotFoundException.class,
- "com.example.Main. Deployment unit id:1.0.0 can’t be
used:"
+ "com.example.Main. Deployment unit id:1.0.0 can't be
used:"
+ " [clusterStatus = OBSOLETE, nodeStatus =
REMOVING]"
)
);
diff --git
a/modules/compute/src/test/resources/units/apache-ignite-1.0-SNAPSHOT-src.zip
b/modules/compute/src/test/resources/units/ignite-jobs-1.0-SNAPSHOT-src.zip
similarity index 100%
rename from
modules/compute/src/test/resources/units/apache-ignite-1.0-SNAPSHOT-src.zip
rename to
modules/compute/src/test/resources/units/ignite-jobs-1.0-SNAPSHOT-src.zip
diff --git
a/modules/compute/src/test/resources/units/unit1/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..1c33f9ceec
Binary files /dev/null and
b/modules/compute/src/test/resources/units/unit1/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
differ
diff --git
a/modules/compute/src/test/resources/units/unit1/1.0.0/unit1-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/1.0.0/unit1-1.0-SNAPSHOT.jar
deleted file mode 100644
index ca829eaffc..0000000000
Binary files
a/modules/compute/src/test/resources/units/unit1/1.0.0/unit1-1.0-SNAPSHOT.jar
and /dev/null differ
diff --git
a/modules/compute/src/test/resources/units/unit1/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..e4d23cf045
Binary files /dev/null and
b/modules/compute/src/test/resources/units/unit1/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
differ
diff --git
a/modules/compute/src/test/resources/units/unit1/2.0.0/unit2-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/2.0.0/unit2-1.0-SNAPSHOT.jar
deleted file mode 100644
index 7925c04ef4..0000000000
Binary files
a/modules/compute/src/test/resources/units/unit1/2.0.0/unit2-1.0-SNAPSHOT.jar
and /dev/null differ
diff --git
a/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job1-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job1-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..1c33f9ceec
Binary files /dev/null and
b/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job1-1.0-SNAPSHOT.jar
differ
diff --git
a/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job2-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job2-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..e4d23cf045
Binary files /dev/null and
b/modules/compute/src/test/resources/units/unit1/3.0.1/ignite-ut-job2-1.0-SNAPSHOT.jar
differ
diff --git
a/modules/compute/src/test/resources/units/unit1/3.0.1/unit1-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/3.0.1/unit1-1.0-SNAPSHOT.jar
deleted file mode 100644
index ca829eaffc..0000000000
Binary files
a/modules/compute/src/test/resources/units/unit1/3.0.1/unit1-1.0-SNAPSHOT.jar
and /dev/null differ
diff --git
a/modules/compute/src/test/resources/units/unit1/3.0.1/unit2-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/3.0.1/unit2-1.0-SNAPSHOT.jar
deleted file mode 100644
index 7925c04ef4..0000000000
Binary files
a/modules/compute/src/test/resources/units/unit1/3.0.1/unit2-1.0-SNAPSHOT.jar
and /dev/null differ
diff --git
a/modules/compute/src/test/resources/units/unit1/3.0.2/ignite-ut-job1-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/3.0.2/ignite-ut-job1-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..1c33f9ceec
Binary files /dev/null and
b/modules/compute/src/test/resources/units/unit1/3.0.2/ignite-ut-job1-1.0-SNAPSHOT.jar
differ
diff --git
a/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/ignite-ut-job2-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/ignite-ut-job2-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..e4d23cf045
Binary files /dev/null and
b/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/ignite-ut-job2-1.0-SNAPSHOT.jar
differ
diff --git
a/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/unit2-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/unit2-1.0-SNAPSHOT.jar
deleted file mode 100644
index 7925c04ef4..0000000000
Binary files
a/modules/compute/src/test/resources/units/unit1/3.0.2/subdir/unit2-1.0-SNAPSHOT.jar
and /dev/null differ
diff --git
a/modules/compute/src/test/resources/units/unit1/3.0.2/unit1-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit1/3.0.2/unit1-1.0-SNAPSHOT.jar
deleted file mode 100644
index ca829eaffc..0000000000
Binary files
a/modules/compute/src/test/resources/units/unit1/3.0.2/unit1-1.0-SNAPSHOT.jar
and /dev/null differ
diff --git
a/modules/compute/src/test/resources/units/unit2/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit2/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..1c33f9ceec
Binary files /dev/null and
b/modules/compute/src/test/resources/units/unit2/1.0.0/ignite-ut-job1-1.0-SNAPSHOT.jar
differ
diff --git
a/modules/compute/src/test/resources/units/unit2/1.0.0/unit1-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit2/1.0.0/unit1-1.0-SNAPSHOT.jar
deleted file mode 100644
index ca829eaffc..0000000000
Binary files
a/modules/compute/src/test/resources/units/unit2/1.0.0/unit1-1.0-SNAPSHOT.jar
and /dev/null differ
diff --git
a/modules/compute/src/test/resources/units/unit2/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit2/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..e4d23cf045
Binary files /dev/null and
b/modules/compute/src/test/resources/units/unit2/2.0.0/ignite-ut-job2-1.0-SNAPSHOT.jar
differ
diff --git
a/modules/compute/src/test/resources/units/unit2/2.0.0/unit2-1.0-SNAPSHOT.jar
b/modules/compute/src/test/resources/units/unit2/2.0.0/unit2-1.0-SNAPSHOT.jar
deleted file mode 100644
index 7925c04ef4..0000000000
Binary files
a/modules/compute/src/test/resources/units/unit2/2.0.0/unit2-1.0-SNAPSHOT.jar
and /dev/null differ
diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp
b/modules/platforms/cpp/tests/client-test/compute_test.cpp
index cff645019b..7aef5fcda3 100644
--- a/modules/platforms/cpp/tests/client-test/compute_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp
@@ -311,7 +311,7 @@ TEST_F(compute_test, unknown_unit) {
auto cluster_nodes = m_client.get_cluster_nodes();
(void) m_client.get_compute().execute(cluster_nodes,
{{"unknown"}}, NODE_NAME_JOB, {});
} catch (const ignite_error &e) {
- EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment
unit unknown:latest doesn’t exist"));
+ EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment
unit unknown:latest doesn't exist"));
throw;
}
},
@@ -325,7 +325,7 @@ TEST_F(compute_test, execute_unknown_unit_and_version) {
auto cluster_nodes = m_client.get_cluster_nodes();
(void) m_client.get_compute().execute(cluster_nodes,
{{"unknown", "1.2.3"}}, NODE_NAME_JOB, {});
} catch (const ignite_error &e) {
- EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment
unit unknown:1.2.3 doesn’t exist"));
+ EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment
unit unknown:1.2.3 doesn't exist"));
throw;
}
},
@@ -339,7 +339,7 @@ TEST_F(compute_test,
execute_colocated_unknown_unit_and_version) {
auto comp = m_client.get_compute();
(void) comp.execute_colocated(TABLE_1, get_tuple(1),
{{"unknown", "1.2.3"}}, NODE_NAME_JOB, {});
} catch (const ignite_error &e) {
- EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment
unit unknown:1.2.3 doesn’t exist"));
+ EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment
unit unknown:1.2.3 doesn't exist"));
throw;
}
},
@@ -353,7 +353,7 @@ TEST_F(compute_test, broadcast_unknown_unit_and_version) {
auto &res1 = res[get_node(1)];
ASSERT_TRUE(res1.has_error());
- EXPECT_THAT(res1.error().what_str(), ::testing::HasSubstr("Deployment unit
unknown:1.2.3 doesn’t exist"));
+ EXPECT_THAT(res1.error().what_str(), ::testing::HasSubstr("Deployment unit
unknown:1.2.3 doesn't exist"));
}
TEST_F(compute_test, execute_empty_unit_name) {
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 5a610e3b27..33cce325be 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -350,7 +350,7 @@ namespace Apache.Ignite.Tests.Compute
var ex = Assert.ThrowsAsync<IgniteException>(
async () => await Client.Compute.ExecuteAsync<string>(await
GetNodeAsync(1), deploymentUnits, NodeNameJob));
- StringAssert.Contains("Deployment unit unit-latest:latest doesn’t
exist", ex!.Message);
+ StringAssert.Contains("Deployment unit unit-latest:latest doesn't
exist", ex!.Message);
}
[Test]
@@ -362,7 +362,7 @@ namespace Apache.Ignite.Tests.Compute
var ex = Assert.ThrowsAsync<IgniteException>(
async () => await
Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple,
deploymentUnits, NodeNameJob));
- StringAssert.Contains("Deployment unit unit-latest:latest doesn’t
exist", ex!.Message);
+ StringAssert.Contains("Deployment unit unit-latest:latest doesn't
exist", ex!.Message);
}
[Test]
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
index 4089cc0768..f79752c390 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
@@ -18,11 +18,14 @@
package org.apache.ignite.internal.compute;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
import static org.apache.ignite.internal.deployunit.InitialDeployMode.MAJORITY;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.nullValue;
import java.io.IOException;
import java.io.InputStream;
@@ -49,7 +52,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
@BeforeEach
void setUp() throws IOException {
- deployJar(node(0), unit.name(), unit.version(),
"ignite-jobs-1.0-SNAPSHOT.jar");
+ deployJar(node(0), unit.name(), unit.version(),
"ignite-it-jobs-1.0-SNAPSHOT.jar");
}
@Override
@@ -103,7 +106,7 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
result,
willThrow(
ClassNotFoundException.class,
- "org.example.ConcatJob. Deployment unit
non-existing:1.0.0 doesn’t exist"
+ "org.example.ConcatJob. Deployment unit
non-existing:1.0.0 doesn't exist"
)
);
}
@@ -115,20 +118,58 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
IgniteImpl entryNode = node(0);
DeploymentUnit firstVersion = new DeploymentUnit("latest-unit",
Version.parseVersion("1.0.0"));
- deployJar(entryNode, firstVersion.name(), firstVersion.version(),
"unit1-1.0-SNAPSHOT.jar");
+ deployJar(entryNode, firstVersion.name(), firstVersion.version(),
"ignite-ut-job1-1.0-SNAPSHOT.jar");
CompletableFuture<Integer> result1 = entryNode.compute()
.execute(Set.of(entryNode.node()), jobUnits,
"org.my.job.compute.unit.UnitJob");
assertThat(result1, willBe(1));
DeploymentUnit secondVersion = new DeploymentUnit("latest-unit",
Version.parseVersion("1.0.1"));
- deployJar(entryNode, secondVersion.name(), secondVersion.version(),
"unit2-1.0-SNAPSHOT.jar");
+ deployJar(entryNode, secondVersion.name(), secondVersion.version(),
"ignite-ut-job2-1.0-SNAPSHOT.jar");
CompletableFuture<String> result2 = entryNode.compute()
.execute(Set.of(entryNode.node()), jobUnits,
"org.my.job.compute.unit.UnitJob");
assertThat(result2, willBe("Hello World!"));
}
+ @Test
+ void undeployAcquiredUnit() {
+ IgniteImpl entryNode = node(0);
+ CompletableFuture<Void> job =
entryNode.compute().execute(Set.of(entryNode.node()), units,
"org.example.SleepJob", 3L);
+
+ assertThat(entryNode.deployment().undeployAsync(unit.name(),
unit.version()), willCompleteSuccessfully());
+
+ assertThat(entryNode.deployment().clusterStatusAsync(unit.name(),
unit.version()), willBe(OBSOLETE));
+ assertThat(entryNode.deployment().nodeStatusAsync(unit.name(),
unit.version()), willBe(OBSOLETE));
+
+ await().failFast("The unit must not be removed until the job is
completed", () -> {
+ assertThat(entryNode.deployment().clusterStatusAsync(unit.name(),
unit.version()), willBe(OBSOLETE));
+ assertThat(entryNode.deployment().nodeStatusAsync(unit.name(),
unit.version()), willBe(OBSOLETE));
+ }).until(() -> job, willCompleteSuccessfully());
+
+ await().until(
+ () -> entryNode.deployment().clusterStatusAsync(unit.name(),
unit.version()),
+ willBe(nullValue())
+ );
+ }
+
+ @Test
+ void executeJobWithObsoleteUnit() {
+ IgniteImpl entryNode = node(0);
+ CompletableFuture<Void> successJob =
entryNode.compute().execute(Set.of(entryNode.node()), units,
"org.example.SleepJob", 2L);
+
+ assertThat(entryNode.deployment().undeployAsync(unit.name(),
unit.version()), willCompleteSuccessfully());
+
+ CompletableFuture<Void> failedJob =
entryNode.compute().execute(Set.of(entryNode.node()), units,
"org.example.SleepJob", 2L);
+
+ assertThat(failedJob, willThrow(
+ ClassNotFoundException.class,
+ "org.example.SleepJob. Deployment unit jobs:1.0.0 can't be
used: "
+ + "[clusterStatus = OBSOLETE, nodeStatus = OBSOLETE]")
+ );
+ assertThat(successJob, willCompleteSuccessfully());
+ }
+
private static void deployJar(IgniteImpl node, String unitId, Version
unitVersion, String jarName) throws IOException {
try (InputStream jarStream =
ItComputeTestStandalone.class.getClassLoader().getResourceAsStream("units/" +
jarName)) {
CompletableFuture<Boolean> deployed =
node.deployment().deployAsync(
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 72178a5b19..6912c90e76 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -215,7 +215,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
NodeNameJob.class.getName()).join());
var cause = (IgniteException) ex.getCause();
- assertThat(cause.getMessage(), containsString("Deployment unit
u:latest doesn’t exist"));
+ assertThat(cause.getMessage(), containsString("Deployment unit
u:latest doesn't exist"));
// TODO IGNITE-19823 DeploymentUnitNotFoundException is internal, does
not propagate to client.
assertEquals(INTERNAL_ERR, cause.code());
@@ -232,7 +232,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
NodeNameJob.class.getName()).join());
var cause = (IgniteException) ex.getCause();
- assertThat(cause.getMessage(), containsString("Deployment unit
u:latest doesn’t exist"));
+ assertThat(cause.getMessage(), containsString("Deployment unit
u:latest doesn't exist"));
// TODO IGNITE-19823 DeploymentUnitNotFoundException is internal, does
not propagate to client.
assertEquals(INTERNAL_ERR, cause.code());
diff --git
a/modules/runner/src/integrationTest/resources/units/ignite-it-jobs-1.0-SNAPSHOT.jar
b/modules/runner/src/integrationTest/resources/units/ignite-it-jobs-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..7700570180
Binary files /dev/null and
b/modules/runner/src/integrationTest/resources/units/ignite-it-jobs-1.0-SNAPSHOT.jar
differ
diff --git
a/modules/runner/src/integrationTest/resources/units/apache-ignite-1.0-SNAPSHOT-src.zip
b/modules/runner/src/integrationTest/resources/units/ignite-jobs-1.0-SNAPSHOT-src.zip
similarity index 100%
rename from
modules/runner/src/integrationTest/resources/units/apache-ignite-1.0-SNAPSHOT-src.zip
rename to
modules/runner/src/integrationTest/resources/units/ignite-jobs-1.0-SNAPSHOT-src.zip
diff --git
a/modules/runner/src/integrationTest/resources/units/ignite-jobs-1.0-SNAPSHOT.jar
b/modules/runner/src/integrationTest/resources/units/ignite-jobs-1.0-SNAPSHOT.jar
deleted file mode 100644
index e15cca0bf1..0000000000
Binary files
a/modules/runner/src/integrationTest/resources/units/ignite-jobs-1.0-SNAPSHOT.jar
and /dev/null differ
diff --git
a/modules/runner/src/integrationTest/resources/units/ignite-ut-job1-1.0-SNAPSHOT.jar
b/modules/runner/src/integrationTest/resources/units/ignite-ut-job1-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..1c33f9ceec
Binary files /dev/null and
b/modules/runner/src/integrationTest/resources/units/ignite-ut-job1-1.0-SNAPSHOT.jar
differ
diff --git
a/modules/runner/src/integrationTest/resources/units/ignite-ut-job2-1.0-SNAPSHOT.jar
b/modules/runner/src/integrationTest/resources/units/ignite-ut-job2-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000..e4d23cf045
Binary files /dev/null and
b/modules/runner/src/integrationTest/resources/units/ignite-ut-job2-1.0-SNAPSHOT.jar
differ
diff --git
a/modules/runner/src/integrationTest/resources/units/unit1-1.0-SNAPSHOT.jar
b/modules/runner/src/integrationTest/resources/units/unit1-1.0-SNAPSHOT.jar
deleted file mode 100644
index ca829eaffc..0000000000
Binary files
a/modules/runner/src/integrationTest/resources/units/unit1-1.0-SNAPSHOT.jar and
/dev/null differ
diff --git
a/modules/runner/src/integrationTest/resources/units/unit2-1.0-SNAPSHOT.jar
b/modules/runner/src/integrationTest/resources/units/unit2-1.0-SNAPSHOT.jar
deleted file mode 100644
index 7925c04ef4..0000000000
Binary files
a/modules/runner/src/integrationTest/resources/units/unit2-1.0-SNAPSHOT.jar and
/dev/null differ