This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
commit 8a38bc8b901bbcf12ae7d8066e12eb0b200e639c Author: Dimuthu Wannipurage <[email protected]> AuthorDate: Mon Feb 6 13:21:33 2023 -0500 Distributing transfer load uniformly across qualified agents --- .../apache/airavata/mft/admin/MFTConsulClient.java | 2 +- .../mft/controller/TransferDispatcher.java | 38 +++++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java index 35cb4c3..273e6c4 100644 --- a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java +++ b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java @@ -257,7 +257,7 @@ public class MFTConsulClient { * @return * @throws MFTConsulClientException */ - public List<String> getAgentActiveTransferIds(AgentInfo agentInfo) throws MFTConsulClientException { + public List<String> getAgentActiveTransfers(AgentInfo agentInfo) throws MFTConsulClientException { try { List<String> keys = kvClient.getKeys(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentInfo.getId() + "/" + agentInfo.getSessionId()); return keys.stream().map(key -> key.substring(key.lastIndexOf("/") + 1)).collect(Collectors.toList()); diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java index bf3b153..3e3b04d 100644 --- a/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java +++ b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java @@ -18,6 +18,8 @@ package org.apache.airavata.mft.controller; import org.apache.airavata.mft.admin.MFTConsulClient; +import org.apache.airavata.mft.admin.MFTConsulClientException; +import org.apache.airavata.mft.admin.models.AgentInfo; import org.apache.airavata.mft.admin.models.TransferState; import org.apache.airavata.mft.agent.stub.AgentTransferRequest; import org.apache.airavata.mft.api.service.EndpointPaths; @@ -70,7 +72,7 @@ public class TransferDispatcher { .setDescription("Initializing the transfer")); // TODO use a better way to select the right agent - mftConsulClient.commandTransferToAgent(filteredAgents.get(0), transferId, agentTransferRequest); + mftConsulClient.commandTransferToAgent(selectTargetAgent(filteredAgents), transferId, agentTransferRequest); mftConsulClient.markTransferAsProcessed(transferId, transferRequest); logger.info("Marked transfer {} as processed", transferId); @@ -93,6 +95,40 @@ public class TransferDispatcher { } } + + private String selectTargetAgent(List<String> liveAgentIds) throws MFTConsulClientException { + String selectedAgent = null; + List<Optional<AgentInfo>> agentInfos = liveAgentIds.stream().map( + id -> mftConsulClient.getAgentInfo(id)).collect(Collectors.toList()); + int transferCount = -1; + List<String> candidates = new ArrayList<>(); + + for (Optional<AgentInfo> agentInfo : agentInfos) { + if (agentInfo.isPresent()) { + List<String> agentActiveTransfers = mftConsulClient.getAgentActiveTransfers(agentInfo.get()); + logger.info("Agent {} has transfers assigned {}", agentInfo.get().getId(), agentActiveTransfers.size()); + if (transferCount == -1) { + transferCount = agentActiveTransfers.size(); + candidates.add(agentInfo.get().getId()); + } else if (transferCount == agentActiveTransfers.size()) { + candidates.add(agentInfo.get().getId()); + } else if (transferCount > agentActiveTransfers.size()) { + candidates = new ArrayList<>(); + transferCount = agentActiveTransfers.size(); + candidates.add(agentInfo.get().getId()); + } + } + } + + if (candidates.size() > 0) { + Random rand = new Random(); + selectedAgent = candidates.get(rand.nextInt(candidates.size())); + logger.info("Selecting agent {}", selectedAgent); + } + + return selectedAgent; + } + public void handleTransferRequest(String transferId, TransferApiRequest transferRequest, AgentTransferRequest.Builder agentTransferRequestTemplate,
