This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new eb05ff2934 IGNITE-24217 Running multiple compute jobs fails (#5057)
eb05ff2934 is described below
commit eb05ff29347bc5ab63c9284cac0ff4946407ebd6
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Thu Jan 16 16:53:21 2025 +0300
IGNITE-24217 Running multiple compute jobs fails (#5057)
---
.../internal/deployunit/DefaultNodeCallback.java | 52 ++--------
.../deployunit/DeployMessagingService.java | 6 +-
.../internal/deployunit/DeploymentManagerImpl.java | 69 +++++++------
.../internal/deployunit/DownloadTracker.java | 5 +-
.../ignite/internal/deployunit/UnitDownloader.java | 110 +++++++++++++++++++++
.../ignite/internal/deployunit/UnitStatus.java | 8 +-
.../metastore/status/UnitClusterStatus.java | 6 ++
.../metastore/status/UnitNodeStatus.java | 6 ++
.../ignite/internal/compute/ItComputeBaseTest.java | 45 ++++-----
.../internal/compute/ItComputeTestStandalone.java | 47 ++++++++-
10 files changed, 239 insertions(+), 115 deletions(-)
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
index 3b9a805cbb..748490a54e 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DefaultNodeCallback.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.deployunit;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
-import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -32,79 +31,42 @@ import
org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
/**
* Default implementation of {@link NodeEventCallback}.
*/
public class DefaultNodeCallback extends NodeEventCallback {
- private static final IgniteLogger LOG =
Loggers.forClass(DefaultNodeCallback.class);
-
private final DeploymentUnitStore deploymentUnitStore;
- private final DeployMessagingService messaging;
-
- private final FileDeployerService deployer;
-
private final DeploymentUnitAcquiredWaiter undeployer;
- private final DownloadTracker tracker;
+ private final UnitDownloader downloader;
private final ClusterManagementGroupManager cmgManager;
- private final String nodeName;
-
/**
* Constructor.
*
* @param deploymentUnitStore Deployment units store.
- * @param messaging Deployment messaging service.
- * @param deployer Deployment unit file system service.
* @param undeployer Deployment unit undeployer.
+ * @param downloader Unit downloader.
* @param cmgManager Cluster management group manager.
- * @param nodeName Node consistent ID.
*/
- public DefaultNodeCallback(
+ DefaultNodeCallback(
DeploymentUnitStore deploymentUnitStore,
- DeployMessagingService messaging,
- FileDeployerService deployer,
DeploymentUnitAcquiredWaiter undeployer,
- DownloadTracker tracker,
- ClusterManagementGroupManager cmgManager,
- String nodeName
+ UnitDownloader downloader,
+ ClusterManagementGroupManager cmgManager
) {
this.deploymentUnitStore = deploymentUnitStore;
- this.messaging = messaging;
- this.deployer = deployer;
this.undeployer = undeployer;
- this.tracker = tracker;
+ this.downloader = downloader;
this.cmgManager = cmgManager;
- this.nodeName = nodeName;
}
@Override
public void onUploading(String id, Version version, List<UnitNodeStatus>
holders) {
- tracker.track(id, version,
- () -> messaging.downloadUnitContent(id, version, new
ArrayList<>(getDeployedNodeIds(holders)))
- .thenCompose(content -> {
-
org.apache.ignite.internal.deployunit.DeploymentUnit unit =
UnitContent.toDeploymentUnit(content);
- return deployer.deploy(id, version, unit)
- .whenComplete((deployed, err) -> {
- try {
- unit.close();
- } catch (Exception e) {
- LOG.error("Failed to close
deployment unit", e);
- }
- });
- })
- .thenApply(deployed -> {
- if (deployed) {
- return
deploymentUnitStore.updateNodeStatus(nodeName, id, version, DEPLOYED);
- }
- return deployed;
- })
- );
+ downloader.downloadUnit(id, version, getDeployedNodeIds(holders));
}
@Override
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
index f42602043e..97d98dff11 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.deployunit;
import static java.util.concurrent.CompletableFuture.allOf;
-import java.util.List;
+import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.deployment.version.Version;
@@ -109,7 +109,7 @@ public class DeployMessagingService {
* @param nodes Nodes where unit deployed.
* @return Downloaded deployment unit content.
*/
- CompletableFuture<UnitContent> downloadUnitContent(String id, Version
version, List<String> nodes) {
+ CompletableFuture<UnitContent> downloadUnitContent(String id, Version
version, Collection<String> nodes) {
DownloadUnitRequest request = messageFactory.downloadUnitRequest()
.id(id)
.version(version.render())
@@ -122,7 +122,7 @@ public class DeployMessagingService {
.thenApply(message -> ((DownloadUnitResponse)
message).unitContent());
}
- private ClusterNode resolveClusterNode(List<String> nodes) {
+ private ClusterNode resolveClusterNode(Collection<String> nodes) {
return nodes.stream().map(node ->
clusterService.topologyService().getByConsistentId(node))
.filter(Objects::nonNull)
.findAny()
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
index 3183e5a7c6..465ea7ef2e 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
-import static
org.apache.ignite.internal.deployunit.UnitContent.toDeploymentUnit;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
@@ -34,6 +33,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -54,6 +54,7 @@ import
org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
import org.apache.ignite.internal.deployunit.metastore.NodeEventCallback;
import org.apache.ignite.internal.deployunit.metastore.NodeStatusWatchListener;
import
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
@@ -128,6 +129,8 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
private final ClusterStatusWatchListener clusterStatusWatchListener;
+ private final UnitDownloader unitDownloader;
+
/**
* Constructor.
*
@@ -161,16 +164,9 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
unit -> deploymentUnitStore.updateNodeStatus(nodeName,
unit.name(), unit.version(), REMOVING)
);
messaging = new DeployMessagingService(clusterService, cmgManager,
deployer, tracker);
+ unitDownloader = new UnitDownloader(deploymentUnitStore, nodeName,
deployer, tracker, messaging);
- nodeStatusCallback = new DefaultNodeCallback(
- deploymentUnitStore,
- messaging,
- deployer,
- undeployer,
- tracker,
- cmgManager,
- nodeName
- );
+ nodeStatusCallback = new DefaultNodeCallback(deploymentUnitStore,
undeployer, unitDownloader, cmgManager);
nodeStatusWatchListener = new
NodeStatusWatchListener(deploymentUnitStore, nodeName, nodeStatusCallback);
clusterEventCallback = new
ClusterEventCallbackImpl(deploymentUnitStore, deployer, cmgManager, nodeName);
@@ -353,30 +349,41 @@ public class DeploymentManagerImpl implements
IgniteDeployment {
@Override
public CompletableFuture<Boolean> onDemandDeploy(String id, Version
version) {
- return deploymentUnitStore.getAllNodes(id, version)
- .thenCompose(nodes -> {
- if (nodes.isEmpty()) {
+ return deploymentUnitStore.getAllNodeStatuses(id, version)
+ .thenCompose(statuses -> {
+ if (statuses.isEmpty()) {
return falseCompletedFuture();
}
- if (nodes.contains(nodeName)) {
- return trueCompletedFuture();
- }
- return messaging.downloadUnitContent(id, version, nodes)
- .thenCompose(content -> {
- return
deploymentUnitStore.getClusterStatus(id, version)
- .thenCompose(status -> {
- DeploymentUnit unit =
toDeploymentUnit(content);
- return deployToLocalNode(status,
unit)
- .whenComplete((deployed,
throwable) -> {
- try {
- unit.close();
- } catch (Exception e) {
- LOG.error("Error
closing deployment unit", e);
- }
- });
- });
- });
+ Optional<UnitNodeStatus> nodeStatus = statuses.stream()
+ .filter(status -> status.nodeId().equals(nodeName))
+ .findFirst();
+
+ if (nodeStatus.isPresent()) {
+ switch (nodeStatus.get().status()) {
+ case UPLOADING:
+ // Wait for the upload
+ LOG.debug("Status is UPLOADING, downloading
the unit");
+ return unitDownloader.downloadUnit(statuses,
id, version);
+ case DEPLOYED:
+ // Unit is already deployed on the local node.
+ LOG.debug("Status is DEPLOYED");
+ return trueCompletedFuture();
+ default:
+ // Invalid status, cluster status should be
deployed
+ LOG.debug("Invalid status {}",
nodeStatus.get().status());
+ return falseCompletedFuture();
+ }
+ } else {
+ // Node was not in the initial deploy list, create
status in the UPLOADING state and wait for the upload.
+ return deploymentUnitStore.getClusterStatus(id,
version).thenCompose(clusterStatus ->
+ deploymentUnitStore.createNodeStatus(nodeName,
id, version, clusterStatus.opId(), UPLOADING)
+ .thenCompose(created -> {
+ LOG.debug("Status {}, downloading
the unit", created ? "created" : "not created");
+ return
unitDownloader.downloadUnit(statuses, id, version);
+ })
+ );
+ }
});
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
index d5cf07dfe6..25ef8458a2 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DownloadTracker.java
@@ -44,7 +44,10 @@ public class DownloadTracker {
*/
public <T> CompletableFuture<T> track(String id, Version version,
Supplier<CompletableFuture<T>> trackableAction) {
ClusterStatusKey key =
ClusterStatusKey.builder().id(id).version(version).build();
- return (CompletableFuture<T>) inFlightFutures.computeIfAbsent(key, k
-> trackableAction.get());
+ return ((CompletableFuture<T>) inFlightFutures.computeIfAbsent(key,
+ k -> trackableAction.get()
+ .whenComplete((result, throwable) ->
inFlightFutures.remove(key))
+ ));
}
/**
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
new file mode 100644
index 0000000000..a1c472710e
--- /dev/null
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitDownloader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static
org.apache.ignite.internal.deployunit.UnitContent.toDeploymentUnit;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStore;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+
+/**
+ * Unit downloader.
+ */
+class UnitDownloader {
+ private static final IgniteLogger LOG =
Loggers.forClass(UnitDownloader.class);
+
+ private final DeploymentUnitStore deploymentUnitStore;
+
+ private final String nodeName;
+
+ private final FileDeployerService deployer;
+
+ private final DownloadTracker tracker;
+
+ private final DeployMessagingService messaging;
+
+ UnitDownloader(
+ DeploymentUnitStore deploymentUnitStore,
+ String nodeName,
+ FileDeployerService deployer,
+ DownloadTracker tracker,
+ DeployMessagingService messaging
+ ) {
+ this.deploymentUnitStore = deploymentUnitStore;
+ this.nodeName = nodeName;
+ this.deployer = deployer;
+ this.tracker = tracker;
+ this.messaging = messaging;
+ }
+
+ /**
+ * Downloads specified unit from any node where this unit is deployed from
the specified collection of nodes to the local node, deploys
+ * it and sets the node status to {@link DeploymentStatus#DEPLOYED}.
+ *
+ * @param statuses Collection of all node statuses for this unit.
+ * @param id Deployment unit identifier.
+ * @param version Deployment unit version.
+ */
+ CompletableFuture<Boolean> downloadUnit(Collection<UnitNodeStatus>
statuses, String id, Version version) {
+ List<String> deployedNodes = statuses.stream()
+ .filter(status -> status.status() == DEPLOYED)
+ .map(UnitNodeStatus::nodeId)
+ .collect(Collectors.toList());
+
+ return downloadUnit(id, version, deployedNodes);
+ }
+
+ /**
+ * Downloads specified unit from any node from the specified collection of
nodes to the local node, deploys it and sets the node status
+ * to {@link DeploymentStatus#DEPLOYED}.
+ *
+ * @param id Deployment unit identifier.
+ * @param version Deployment unit version.
+ * @param nodes Nodes where the unit is deployed.
+ */
+ CompletableFuture<Boolean> downloadUnit(String id, Version version,
Collection<String> nodes) {
+ return tracker.track(id, version, () ->
messaging.downloadUnitContent(id, version, nodes)
+ .thenCompose(content -> {
+ DeploymentUnit unit = toDeploymentUnit(content);
+ return deployer.deploy(id, version, unit)
+ .whenComplete((deployed, throwable) -> {
+ try {
+ unit.close();
+ } catch (Exception e) {
+ LOG.error("Error closing deployment unit",
e);
+ }
+ });
+ })
+ .thenCompose(deployed -> {
+ if (deployed) {
+ return deploymentUnitStore.updateNodeStatus(nodeName,
id, version, DEPLOYED);
+ }
+ return falseCompletedFuture();
+ })
+ );
+ }
+}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
index ef26699e53..68a94283a9 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.deployunit;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.tostring.S;
/**
* Unit meta data class.
@@ -122,11 +123,6 @@ public abstract class UnitStatus {
@Override
public String toString() {
- return "UnitStatus{"
- + "id='" + id + '\''
- + ", version=" + version
- + ", status=" + status
- + ", opId=" + opId
- + '}';
+ return S.toString(UnitStatus.class, this);
}
}
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
index cc09aa6a8e..df99f24233 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.deployunit.DeploymentStatus;
import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/**
@@ -87,6 +88,11 @@ public class UnitClusterStatus extends UnitStatus {
return result;
}
+ @Override
+ public String toString() {
+ return S.toString(UnitClusterStatus.class, this, super.toString());
+ }
+
/**
* Serialize unit cluster status to byte array.
*
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
index ace6ca68f8..2ffe8c9e30 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.deployunit.DeploymentStatus;
import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/**
@@ -77,6 +78,11 @@ public class UnitNodeStatus extends UnitStatus {
return result;
}
+ @Override
+ public String toString() {
+ return S.toString(UnitNodeStatus.class, this, super.toString());
+ }
+
/**
* Serialize unit node status to byte array.
*
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 93eaa645a5..a67692a59d 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -275,7 +275,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
IgniteException ex = assertThrows(IgniteException.class, () ->
compute().execute(
JobTarget.node(clusterNode(entryNode)),
-
JobDescriptor.builder(failingJobClassName()).units(units()).build(), null));
+
JobDescriptor.builder(failingJobClass()).units(units()).build(), null));
assertComputeException(ex, "JobException", "Oops");
}
@@ -286,7 +286,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
JobExecution<String> execution = submit(
JobTarget.node(clusterNode(entryNode)),
- JobDescriptor.<Object,
String>builder(failingJobClassName()).units(units()).build(),
+
JobDescriptor.builder(failingJobClass()).units(units()).build(),
null
);
@@ -301,7 +301,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
void executesFailingJobOnRemoteNodes() {
IgniteException ex = assertThrows(IgniteException.class, () ->
compute().execute(
JobTarget.anyNode(clusterNode(node(1)), clusterNode(node(2))),
-
JobDescriptor.builder(failingJobClassName()).units(units()).build(), null));
+
JobDescriptor.builder(failingJobClass()).units(units()).build(), null));
assertComputeException(ex, "JobException", "Oops");
}
@@ -323,7 +323,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
void executesFailingJobOnRemoteNodesAsync() {
JobExecution<String> execution = submit(
JobTarget.anyNode(clusterNode(node(1)), clusterNode(node(2))),
- JobDescriptor.<Object,
String>builder(failingJobClassName()).units(units()).build(),
+
JobDescriptor.builder(failingJobClass()).units(units()).build(),
null
);
@@ -376,7 +376,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
void broadcastsFailingJob() {
BroadcastExecution<String> broadcastExecution = submit(
Set.of(clusterNode(node(0)), clusterNode(node(1)),
clusterNode(node(2))),
- JobDescriptor.<Object,
String>builder(failingJobClassName()).units(units()).build(),
+
JobDescriptor.builder(failingJobClass()).units(units()).build(),
null
);
@@ -435,7 +435,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
var ex = assertThrows(CompletionException.class,
() -> compute().submitAsync(
JobTarget.colocated("BAD_TABLE",
Tuple.create(Map.of("k", 1))),
-
JobDescriptor.builder(getNodeNameJobClassName()).units(units()).build(),
+
JobDescriptor.builder(getNodeNameJobClass()).units(units()).build(),
null
).join()
);
@@ -568,10 +568,10 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
@Test
void submitMapReduce() {
- List<DeploymentUnit> units = units();
- @Nullable List<DeploymentUnit> arg = units();
TaskExecution<Integer> taskExecution = compute().submitMapReduce(
- TaskDescriptor.<List<DeploymentUnit>,
Integer>builder(mapReduceTaskClassName()).units(units).build(), arg);
+
TaskDescriptor.builder(mapReduceTaskClass()).units(units()).build(),
+ units()
+ );
int sumOfNodeNamesLengths =
CLUSTER.runningNodes().map(Ignite::name).map(String::length).reduce(Integer::sum).orElseThrow();
assertThat(taskExecution.resultAsync(), willBe(sumOfNodeNamesLengths));
@@ -587,8 +587,9 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
@Test
void executeMapReduceAsync() {
CompletableFuture<Integer> future = compute().executeMapReduceAsync(
- TaskDescriptor.<List<DeploymentUnit>,
Integer>builder(mapReduceTaskClassName()).units(units()).build(),
- units());
+
TaskDescriptor.builder(mapReduceTaskClass()).units(units()).build(),
+ units()
+ );
int sumOfNodeNamesLengths =
CLUSTER.runningNodes().map(Ignite::name).map(String::length).reduce(Integer::sum).orElseThrow();
assertThat(future, willBe(sumOfNodeNamesLengths));
@@ -596,9 +597,7 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
@Test
void executeMapReduce() {
- int result = compute().executeMapReduce(
- TaskDescriptor.<List<DeploymentUnit>,
Integer>builder(mapReduceTaskClassName()).units(units()).build(),
- units());
+ int result =
compute().executeMapReduce(TaskDescriptor.builder(mapReduceTaskClass()).units(units()).build(),
units());
int sumOfNodeNamesLengths =
CLUSTER.runningNodes().map(Ignite::name).map(String::length).reduce(Integer::sum).orElseThrow();
assertThat(result, is(sumOfNodeNamesLengths));
@@ -752,28 +751,20 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
).map(Arguments::of);
}
- static String toStringJobClassName() {
- return ToStringJob.class.getName();
- }
-
- private static Class<ToStringJob> toStringJobClass() {
+ static Class<ToStringJob> toStringJobClass() {
return ToStringJob.class;
}
- private static String getNodeNameJobClassName() {
- return GetNodeNameJob.class.getName();
- }
-
private static Class<GetNodeNameJob> getNodeNameJobClass() {
return GetNodeNameJob.class;
}
- private static String failingJobClassName() {
- return FailingJob.class.getName();
+ private static Class<FailingJob> failingJobClass() {
+ return FailingJob.class;
}
- private static String mapReduceTaskClassName() {
- return MapReduce.class.getName();
+ private static Class<MapReduce> mapReduceTaskClass() {
+ return MapReduce.class;
}
static void assertComputeException(Exception ex, Throwable cause) {
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
index 8159eb7b74..27fe655a72 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
@@ -20,20 +20,29 @@ package org.apache.ignite.internal.compute;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
import static org.apache.ignite.internal.deployunit.InitialDeployMode.MAJORITY;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.oneOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.JobDescriptor;
@@ -56,6 +65,11 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
private final List<DeploymentUnit> units = List.of(unit);
+ @Override
+ protected int[] cmgMetastoreNodes() {
+ return new int[] { 0, 1 }; // Majority will be 0, 1
+ }
+
@BeforeEach
void deploy() throws IOException {
deployJar(node(0), unit.name(), unit.version(),
"ignite-integration-test-jobs-1.0-SNAPSHOT.jar");
@@ -81,6 +95,34 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
return units;
}
+ @Test
+ void executeMultipleRemoteJobs() {
+ IgniteImpl ignite = unwrapIgniteImpl(node(0));
+ CompletableFuture<Set<String>> majority =
ignite.clusterManagementGroupManager().majority();
+ assertThat(majority, will(contains(node(0).name(), node(1).name())));
+
+
assertThat(unwrapIgniteImpl(node(0)).deployment().nodeStatusAsync(unit.name(),
unit.version()), willBe(DEPLOYED));
+
assertThat(unwrapIgniteImpl(node(1)).deployment().nodeStatusAsync(unit.name(),
unit.version()), willBe(oneOf(UPLOADING, DEPLOYED)));
+
assertThat(unwrapIgniteImpl(node(2)).deployment().nodeStatusAsync(unit.name(),
unit.version()), willBe(nullValue()));
+
+ // Execute concurrently on majority, unit should be in the uploading
or deployed state at this point
+ List<CompletableFuture<String>> resultsMajority = IntStream.range(0,
3).mapToObj(i -> compute().executeAsync(
+ JobTarget.node(clusterNode(1)),
+
JobDescriptor.builder(toStringJobClass()).units(units()).build(),
+ 42
+ )).collect(Collectors.toList());
+
+ // Execute concurrently on non-majority, unit is missing, will trigger
on-demand deploy
+ List<CompletableFuture<String>> resultsMissing = IntStream.range(0,
3).mapToObj(i -> compute().executeAsync(
+ JobTarget.node(clusterNode(2)),
+
JobDescriptor.builder(toStringJobClass()).units(units()).build(),
+ 42
+ )).collect(Collectors.toList());
+
+ assertThat(resultsMajority, everyItem(will(equalTo("42"))));
+ assertThat(resultsMissing, everyItem(will(equalTo("42"))));
+ }
+
@Test
void executesJobWithNonExistingUnit() {
Ignite entryNode = node(0);
@@ -88,8 +130,9 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
List<DeploymentUnit> nonExistingUnits = List.of(new
DeploymentUnit("non-existing", "1.0.0"));
CompletableFuture<String> result = entryNode.compute().executeAsync(
JobTarget.node(clusterNode(entryNode)),
- JobDescriptor.<Object[],
String>builder(toStringJobClassName()).units(nonExistingUnits).build(),
- null);
+
JobDescriptor.builder(toStringJobClass()).units(nonExistingUnits).build(),
+ null
+ );
CompletionException ex0 = assertThrows(CompletionException.class,
result::join);