This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new 555fc42  Load balancing transfer requests across over agents
555fc42 is described below

commit 555fc422372e735658caf002a5e85aa777e0b3c6
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Tue May 17 11:36:16 2022 -0400

    Load balancing transfer requests across over agents
---
 .../org/apache/airavata/mft/agent/MFTAgent.java    |  1 +
 .../apache/airavata/mft/admin/MFTConsulClient.java | 50 ++++++++++++++++------
 .../airavata/mft/admin/models/AgentInfo.java       | 10 +++++
 .../airavata/mft/controller/MFTController.java     | 17 +++++++-
 4 files changed, 63 insertions(+), 15 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java 
b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 18e7ba6..9490225 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -366,6 +366,7 @@ public class MFTAgent implements CommandLineRunner {
                     .setId(agentId)
                     .setHost(agentHost)
                     .setUser(agentUser)
+                    .setSessionId(this.session)
                     
.setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
                     .setLocalStorages(new ArrayList<>()));
         }
diff --git 
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
 
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
index e46fc9b..8fdeaaf 100644
--- 
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
+++ 
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
@@ -207,6 +207,27 @@ public class MFTConsulClient {
         }
     }
 
+    /**
+     * Lists all currently processing transfer id for the given agent
+     *
+     * @param agentInfo
+     * @return
+     * @throws MFTConsulClientException
+     */
+    public List<String> getAgentActiveTransferIds(AgentInfo agentInfo) throws 
MFTConsulClientException {
+        try {
+            List<String> keys = 
kvClient.getKeys(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentInfo.getId() + 
"/" + agentInfo.getSessionId());
+            return keys.stream().map(key -> key.substring(key.lastIndexOf("/") 
+ 1)).collect(Collectors.toList());
+        } catch (ConsulException e) {
+            if (e.getCode() == 404) {
+                return Collections.emptyList();
+            }
+            throw new MFTConsulClientException("Error in fetching active 
transfers for agent " + agentInfo.getId(), e);
+        } catch (Exception e) {
+            throw new MFTConsulClientException("Error in fetching active 
transfers for agent " + agentInfo.getId(), e);
+        }
+    }
+
     /**
      * Agents should call this method to submit {@link TransferState}. These 
status are received by the controller and reorder
      * status messages and put in the final status array.
@@ -237,12 +258,8 @@ public class MFTConsulClient {
      */
     public void saveTransferState(String transferId, TransferState 
transferState) throws MFTConsulClientException {
         try {
-            List<TransferState> allStates = getTransferStates(transferId);
-            // TODO implement sequence consistency
-            allStates.add(transferState);
-            String asStr = mapper.writeValueAsString(allStates);
-            kvClient.putValue(TRANSFER_STATE_PATH + transferId, asStr);
-
+            String asStr = mapper.writeValueAsString(transferState);
+            kvClient.putValue(TRANSFER_STATE_PATH + transferId + "/" + 
UUID.randomUUID().toString(), asStr);
             logger.info("Saved transfer status " + asStr);
 
         } catch (Exception e) {
@@ -287,15 +304,20 @@ public class MFTConsulClient {
      * @throws IOException
      */
     public List<TransferState> getTransferStates(String transferId) throws 
IOException {
-        Optional<Value> valueOp = kvClient.getValue(TRANSFER_STATE_PATH + 
transferId);
-        List<TransferState> allStates;
-        if (valueOp.isPresent()) {
-            String prevStates = valueOp.get().getValueAsString().get();
-            allStates = new 
ArrayList<>(Arrays.asList(mapper.readValue(prevStates, TransferState[].class)));
-        } else {
-            allStates = new ArrayList<>();
+        List<String> keys = kvClient.getKeys(TRANSFER_STATE_PATH + transferId);
+
+        List<TransferState> allStates = new ArrayList<>();
+
+        for (String key: keys) {
+            Optional<Value> valueOp = kvClient.getValue(key);
+            String stateAsStr = valueOp.get().getValueAsString().get();
+            TransferState transferState = mapper.readValue(stateAsStr, 
TransferState.class);
+            allStates.add(transferState);
         }
-        return allStates;
+        List<TransferState> sortedStates = allStates.stream().sorted((o1, o2) 
->
+                (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) < 0 ? -1 :
+                (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) == 0 ? 0 : 
1).collect(Collectors.toList());
+        return sortedStates;
     }
 
     public List<AgentInfo> getLiveAgentInfos() throws MFTConsulClientException 
{
diff --git 
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
 
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
index db350f7..e015702 100644
--- 
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
+++ 
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
@@ -24,6 +24,7 @@ public class AgentInfo {
     private String host;
     private String user;
     private boolean sudo;
+    private String sessionId;
     private List<String> supportedProtocols;
     private List<String> localStorages;
 
@@ -80,4 +81,13 @@ public class AgentInfo {
         this.localStorages = localStorages;
         return this;
     }
+
+    public String getSessionId() {
+        return sessionId;
+    }
+
+    public AgentInfo setSessionId(String sessionId) {
+        this.sessionId = sessionId;
+        return this;
+    }
 }
diff --git 
a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
 
b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
index 2576cee..77fb123 100644
--- 
a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
+++ 
b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
@@ -25,6 +25,7 @@ import com.orbitz.consul.cache.KVCache;
 import com.orbitz.consul.model.kv.Value;
 import org.apache.airavata.mft.admin.MFTConsulClient;
 import org.apache.airavata.mft.admin.MFTConsulClientException;
+import org.apache.airavata.mft.admin.models.AgentInfo;
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.api.service.TransferApiRequest;
 import org.slf4j.Logger;
@@ -40,10 +41,12 @@ import javax.annotation.PreDestroy;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 @SpringBootApplication()
 @ComponentScan(basePackages = {"org.apache.airavata.mft"})
@@ -247,7 +250,19 @@ public class MFTController implements CommandLineRunner {
                 selectedAgent = possibleAgent.get();
             }
         } else if (!transferRequest.getAffinityTransfer()){
-            selectedAgent = liveAgentIds.get(0);
+            List<Optional<AgentInfo>> agentInfos = 
liveAgentIds.stream().map(id -> 
mftConsulClient.getAgentInfo(id)).collect(Collectors.toList());
+            int transferCount = -1;
+            for (Optional<AgentInfo> agentInfo : agentInfos) {
+                if (agentInfo.isPresent()) {
+                    if (transferCount == -1) {
+                        transferCount = 
mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
+                        selectedAgent = agentInfo.get().getId();
+                    } else if (transferCount > 
mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size()) {
+                        transferCount = 
mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
+                        selectedAgent = agentInfo.get().getId();
+                    }
+                }
+            }
         }
 
         if (selectedAgent == null) {

Reply via email to