This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new e0b4d31  Updating pending transfer count in agents
e0b4d31 is described below

commit e0b4d31c4514c4a81eff910b8cbe9cacb7ff97fb
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Fri Feb 10 14:19:31 2023 -0500

    Updating pending transfer count in agents
---
 .../airavata/mft/agent/TransferOrchestrator.java   | 13 +++++
 .../apache/airavata/mft/admin/MFTConsulClient.java | 68 ++++++++++++++++------
 .../mft/controller/TransferDispatcher.java         | 16 ++---
 3 files changed, 71 insertions(+), 26 deletions(-)

diff --git 
a/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
 
b/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
index 5395876..2eed2a0 100644
--- 
a/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
+++ 
b/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
@@ -17,6 +17,7 @@
 
 package org.apache.airavata.mft.agent;
 
+import org.apache.airavata.mft.admin.MFTConsulClient;
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.agent.stub.*;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
@@ -25,6 +26,7 @@ import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -61,6 +63,12 @@ public class TransferOrchestrator {
     
@org.springframework.beans.factory.annotation.Value("${agent.temp.data.dir}")
     private String tempDataDir = "/tmp";
 
+    @org.springframework.beans.factory.annotation.Value("${agent.id}")
+    private String agentId;
+
+    @Autowired
+    private MFTConsulClient mftConsulClient;
+
     @PostConstruct
     public void init() {
         transferRequestExecutor  = 
Executors.newFixedThreadPool(concurrentTransfers);
@@ -68,6 +76,7 @@ public class TransferOrchestrator {
                 concurrentTransfers,
                 concurrentChunkedThreads,
                 chunkedSize, doChunkStream);
+        mftConsulClient.updateAgentPendingTransferCount(agentId, 0);
         logger.info("Transfer orchestrator initialized");
     }
 
@@ -81,6 +90,8 @@ public class TransferOrchestrator {
                                         BiConsumer<EndpointPaths, 
TransferState> updateStatus,
                                         BiConsumer<EndpointPaths, Boolean> 
createTransferHook) {
         long totalPending = 
totalPendingTransfers.addAndGet(request.getEndpointPathsCount());
+        mftConsulClient.updateAgentPendingTransferCount(agentId, totalPending);
+
         logger.info("Total pending files to transfer {}", totalPending);
         for (EndpointPaths endpointPath : request.getEndpointPathsList()) {
 
@@ -100,6 +111,8 @@ public class TransferOrchestrator {
 
             long running = totalRunningTransfers.incrementAndGet();
             long pending = totalPendingTransfers.decrementAndGet();
+            mftConsulClient.updateAgentPendingTransferCount(agentId, pending);
+
             logger.info("Received request {}. Total Running {}. Total Pending 
{}", transferId, running, pending);
 
             updateStatus.accept(endpointPath, new TransferState()
diff --git 
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
 
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
index 273e6c4..d2e0252 100644
--- 
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
+++ 
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
@@ -71,6 +71,7 @@ public class MFTConsulClient {
     public static final String TRANSFER_PROCESSED_PATH = 
"mft/transfer/processed/";
     public static final String AGENTS_TRANSFER_REQUEST_MESSAGE_PATH = 
"mft/agents/transfermessages/";
     public static final String AGENTS_SCHEDULED_PATH = "mft/agents/scheduled/";
+    public static final String AGENTS_PENDING_TRANSFER_COUNT_PATH = 
"mft/agents/pendingtransfers/";
 
     public static final String CONTROLLER_STATE_MESSAGE_PATH = 
"mft/controller/messages/states/";
     public static final String CONTROLLER_TRANSFER_MESSAGE_PATH = 
"mft/controller/messages/transfers/";
@@ -115,6 +116,32 @@ public class MFTConsulClient {
     public MFTConsulClient() {
     }
 
+    public synchronized void updateAgentPendingTransferCount(String agentId, 
long transferCount) {
+        try {
+            kvClient.putValue(AGENTS_PENDING_TRANSFER_COUNT_PATH + agentId, 
transferCount + "");
+        } catch (Exception e) {
+            logger.error("Failed update pending transfer count {} in consul 
for agent {}. But Continuing the execution",
+                    transferCount, agentId);
+        }
+    }
+
+    public long getAgentPendingTransferCount(String agentId) {
+        try {
+            Optional<Value> valueOp = 
kvClient.getValue(AGENTS_PENDING_TRANSFER_COUNT_PATH + agentId);
+            if (valueOp.isEmpty()) {
+                return 0;
+            }
+            String countAsStr = valueOp.get().getValueAsString().get();
+            return Long.parseLong(countAsStr);
+        } catch (ConsulException e) {
+            if (e.getCode() == 404) {
+                return 0;
+            } else {
+                throw e;
+            }
+        }
+    }
+
     public String submitTransfer(TransferApiRequest transferRequest) throws 
MFTConsulClientException {
         try {
             String transferId = UUID.randomUUID().toString();
@@ -271,6 +298,28 @@ public class MFTConsulClient {
         }
     }
 
+    public int getEndpointHookCountForAgent(String agentId) throws 
MFTConsulClientException {
+        Optional<String> sessionOp = getKvClient().getSession(LIVE_AGENTS_PATH 
+ agentId);
+
+        try {
+            try {
+                if (sessionOp.isPresent()) {
+                    List<String> transfers = 
getKvClient().getKeys(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentId + "/" + 
sessionOp.get());
+                    return transfers.size();
+                } else {
+                    return 0;
+                }
+            } catch (ConsulException e) {
+                if (e.getCode() == 404) {
+                    return 0;
+                } else {
+                    throw e;
+                }
+            }
+        } catch (Exception e) {
+            throw new MFTConsulClientException("Failed to fetch endpoint hook 
count for agent " + agentId, e);
+        }
+    }
     /**
      * Agents should call this method to submit {@link TransferState}. These 
status are received by the controller and reorder
      * status messages and put in the final status array.
@@ -418,25 +467,6 @@ public class MFTConsulClient {
                 + "/" + getEndpointPathHash(endpointPaths));
     }
 
-    public int getEndpointHookCountForAgent(String agentId) throws 
MFTConsulClientException {
-        Optional<String> session = getKvClient().getSession(LIVE_AGENTS_PATH + 
agentId);
-
-        try {
-            try {
-                return session.map(s -> 
getKvClient().getKeys(MFTConsulClient.AGENTS_SCHEDULED_PATH
-                        + agentId + "/" + s).size()).orElse(0);
-            } catch (ConsulException e) {
-                if (e.getCode() == 404) {
-                    return 0;
-                } else {
-                    throw e;
-                }
-            }
-        } catch (Exception e) {
-            throw new MFTConsulClientException("Failed to fetch endpoint hook 
count for agent " + agentId, e);
-        }
-    }
-
     public KeyValueClient getKvClient() {
         return kvClient;
     }
diff --git 
a/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
 
b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
index 3e3b04d..aa0a462 100644
--- 
a/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
+++ 
b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
@@ -100,21 +100,23 @@ public class TransferDispatcher {
         String selectedAgent = null;
         List<Optional<AgentInfo>> agentInfos = liveAgentIds.stream().map(
                 id -> 
mftConsulClient.getAgentInfo(id)).collect(Collectors.toList());
-        int transferCount = -1;
+        long transferCount = -1;
         List<String> candidates = new ArrayList<>();
 
         for (Optional<AgentInfo> agentInfo : agentInfos) {
             if (agentInfo.isPresent()) {
-                List<String> agentActiveTransfers = 
mftConsulClient.getAgentActiveTransfers(agentInfo.get());
-                logger.info("Agent {} has transfers assigned {}", 
agentInfo.get().getId(), agentActiveTransfers.size());
+                int agentActiveTransfers = 
mftConsulClient.getEndpointHookCountForAgent(agentInfo.get().getId());
+                long pendingTransferCount = 
mftConsulClient.getAgentPendingTransferCount(agentInfo.get().getId());
+                long totalTransferCount = agentActiveTransfers + 
pendingTransferCount;
+                logger.info("Agent {} has transfers assigned {}", 
agentInfo.get().getId(), totalTransferCount);
                 if (transferCount == -1) {
-                    transferCount = agentActiveTransfers.size();
+                    transferCount = totalTransferCount;
                     candidates.add(agentInfo.get().getId());
-                } else if (transferCount == agentActiveTransfers.size()) {
+                } else if (transferCount == totalTransferCount) {
                     candidates.add(agentInfo.get().getId());
-                } else if (transferCount > agentActiveTransfers.size()) {
+                } else if (transferCount > totalTransferCount) {
                     candidates = new ArrayList<>();
-                    transferCount = agentActiveTransfers.size();
+                    transferCount = totalTransferCount;
                     candidates.add(agentInfo.get().getId());
                 }
             }

Reply via email to