This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3d4e1341a9 IGNITE-20630 Select only available nodes for deployment
unit download (#2713)
3d4e1341a9 is described below
commit 3d4e1341a98976556f1bad9838ed09b1fac1ff28
Author: Mikhail <[email protected]>
AuthorDate: Thu Oct 19 14:55:43 2023 +0300
IGNITE-20630 Select only available nodes for deployment unit download
(#2713)
---
.../internal/deployunit/DeployMessagingService.java | 19 ++++++++++++++++---
.../exception/DeploymentUnitReadException.java | 9 +++++++++
2 files changed, 25 insertions(+), 3 deletions(-)
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
index 0584c5b6d0..6ef69eb630 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployMessagingService.java
@@ -18,10 +18,11 @@
package org.apache.ignite.internal.deployunit;
import java.util.List;
-import java.util.Random;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.version.Version;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
import org.apache.ignite.internal.deployunit.message.DownloadUnitRequest;
import org.apache.ignite.internal.deployunit.message.DownloadUnitRequestImpl;
@@ -33,6 +34,7 @@ import
org.apache.ignite.internal.deployunit.message.StopDeployResponseImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.network.ChannelType;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
/**
@@ -106,17 +108,28 @@ public class DeployMessagingService {
* @return Downloaded deployment unit content.
*/
CompletableFuture<UnitContent> downloadUnitContent(String id, Version
version, List<String> nodes) {
- String node = nodes.get(new Random().nextInt(nodes.size()));
DownloadUnitRequest request = DownloadUnitRequestImpl.builder()
.id(id)
.version(version.render())
.build();
+ ClusterNode clusterNode = resolveClusterNode(nodes);
+
return clusterService.messagingService()
-
.invoke(clusterService.topologyService().getByConsistentId(node),
DEPLOYMENT_CHANNEL, request, Long.MAX_VALUE)
+ .invoke(clusterNode, DEPLOYMENT_CHANNEL, request,
Long.MAX_VALUE)
.thenApply(message -> ((DownloadUnitResponse)
message).unitContent());
}
+ private ClusterNode resolveClusterNode(List<String> nodes) {
+ return nodes.stream().map(node ->
clusterService.topologyService().getByConsistentId(node))
+ .filter(Objects::nonNull)
+ .findAny()
+ .orElseThrow(() -> {
+ LOG.error("No any available node for download unit from
{}", nodes);
+ return new DeploymentUnitReadException("No any available
node for download unit from " + nodes);
+ });
+ }
+
/**
* Stop all in-progress deployment processes for deployment unit with
provided id and version.
*
diff --git
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/exception/DeploymentUnitReadException.java
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/exception/DeploymentUnitReadException.java
index a985769e55..42ee54af24 100644
---
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/exception/DeploymentUnitReadException.java
+++
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/exception/DeploymentUnitReadException.java
@@ -39,4 +39,13 @@ public class DeploymentUnitReadException extends
IgniteException {
public DeploymentUnitReadException(Throwable cause) {
super(CodeDeployment.UNIT_CONTENT_READ_ERR, cause);
}
+
+ /**
+ * Constructor.
+ *
+ * @param message error message.
+ */
+ public DeploymentUnitReadException(String message) {
+ super(CodeDeployment.UNIT_CONTENT_READ_ERR, message);
+ }
}