This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git

commit 8a38bc8b901bbcf12ae7d8066e12eb0b200e639c
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Mon Feb 6 13:21:33 2023 -0500

    Distributing transfer load uniformly across qualified agents
---
 .../apache/airavata/mft/admin/MFTConsulClient.java |  2 +-
 .../mft/controller/TransferDispatcher.java         | 38 +++++++++++++++++++++-
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git 
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
 
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
index 35cb4c3..273e6c4 100644
--- 
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
+++ 
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
@@ -257,7 +257,7 @@ public class MFTConsulClient {
      * @return
      * @throws MFTConsulClientException
      */
-    public List<String> getAgentActiveTransferIds(AgentInfo agentInfo) throws 
MFTConsulClientException {
+    public List<String> getAgentActiveTransfers(AgentInfo agentInfo) throws 
MFTConsulClientException {
         try {
             List<String> keys = 
kvClient.getKeys(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentInfo.getId() + 
"/" + agentInfo.getSessionId());
             return keys.stream().map(key -> key.substring(key.lastIndexOf("/") 
+ 1)).collect(Collectors.toList());
diff --git 
a/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
 
b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
index bf3b153..3e3b04d 100644
--- 
a/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
+++ 
b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
@@ -18,6 +18,8 @@
 package org.apache.airavata.mft.controller;
 
 import org.apache.airavata.mft.admin.MFTConsulClient;
+import org.apache.airavata.mft.admin.MFTConsulClientException;
+import org.apache.airavata.mft.admin.models.AgentInfo;
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.agent.stub.AgentTransferRequest;
 import org.apache.airavata.mft.api.service.EndpointPaths;
@@ -70,7 +72,7 @@ public class TransferDispatcher {
                     .setDescription("Initializing the transfer"));
 
             // TODO use a better way to select the right agent
-            mftConsulClient.commandTransferToAgent(filteredAgents.get(0), 
transferId, agentTransferRequest);
+            
mftConsulClient.commandTransferToAgent(selectTargetAgent(filteredAgents), 
transferId, agentTransferRequest);
             mftConsulClient.markTransferAsProcessed(transferId, 
transferRequest);
             logger.info("Marked transfer {} as processed", transferId);
 
@@ -93,6 +95,40 @@ public class TransferDispatcher {
         }
     }
 
+
+    private String selectTargetAgent(List<String> liveAgentIds) throws 
MFTConsulClientException {
+        String selectedAgent = null;
+        List<Optional<AgentInfo>> agentInfos = liveAgentIds.stream().map(
+                id -> 
mftConsulClient.getAgentInfo(id)).collect(Collectors.toList());
+        int transferCount = -1;
+        List<String> candidates = new ArrayList<>();
+
+        for (Optional<AgentInfo> agentInfo : agentInfos) {
+            if (agentInfo.isPresent()) {
+                List<String> agentActiveTransfers = 
mftConsulClient.getAgentActiveTransfers(agentInfo.get());
+                logger.info("Agent {} has transfers assigned {}", 
agentInfo.get().getId(), agentActiveTransfers.size());
+                if (transferCount == -1) {
+                    transferCount = agentActiveTransfers.size();
+                    candidates.add(agentInfo.get().getId());
+                } else if (transferCount == agentActiveTransfers.size()) {
+                    candidates.add(agentInfo.get().getId());
+                } else if (transferCount > agentActiveTransfers.size()) {
+                    candidates = new ArrayList<>();
+                    transferCount = agentActiveTransfers.size();
+                    candidates.add(agentInfo.get().getId());
+                }
+            }
+        }
+
+        if (candidates.size() > 0) {
+            Random rand = new Random();
+            selectedAgent = candidates.get(rand.nextInt(candidates.size()));
+            logger.info("Selecting agent {}", selectedAgent);
+        }
+
+        return selectedAgent;
+    }
+
     public void handleTransferRequest(String transferId,
                                       TransferApiRequest transferRequest,
                                       AgentTransferRequest.Builder 
agentTransferRequestTemplate,

Reply via email to