This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new e0b4d31 Updating pending transfer count in agents
e0b4d31 is described below
commit e0b4d31c4514c4a81eff910b8cbe9cacb7ff97fb
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Fri Feb 10 14:19:31 2023 -0500
Updating pending transfer count in agents
---
.../airavata/mft/agent/TransferOrchestrator.java | 13 +++++
.../apache/airavata/mft/admin/MFTConsulClient.java | 68 ++++++++++++++++------
.../mft/controller/TransferDispatcher.java | 16 ++---
3 files changed, 71 insertions(+), 26 deletions(-)
diff --git
a/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
b/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
index 5395876..2eed2a0 100644
---
a/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
+++
b/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
@@ -17,6 +17,7 @@
package org.apache.airavata.mft.agent;
+import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.agent.stub.*;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
@@ -25,6 +26,7 @@ import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -61,6 +63,12 @@ public class TransferOrchestrator {
@org.springframework.beans.factory.annotation.Value("${agent.temp.data.dir}")
private String tempDataDir = "/tmp";
+ @org.springframework.beans.factory.annotation.Value("${agent.id}")
+ private String agentId;
+
+ @Autowired
+ private MFTConsulClient mftConsulClient;
+
@PostConstruct
public void init() {
transferRequestExecutor =
Executors.newFixedThreadPool(concurrentTransfers);
@@ -68,6 +76,7 @@ public class TransferOrchestrator {
concurrentTransfers,
concurrentChunkedThreads,
chunkedSize, doChunkStream);
+ mftConsulClient.updateAgentPendingTransferCount(agentId, 0);
logger.info("Transfer orchestrator initialized");
}
@@ -81,6 +90,8 @@ public class TransferOrchestrator {
BiConsumer<EndpointPaths,
TransferState> updateStatus,
BiConsumer<EndpointPaths, Boolean>
createTransferHook) {
long totalPending =
totalPendingTransfers.addAndGet(request.getEndpointPathsCount());
+ mftConsulClient.updateAgentPendingTransferCount(agentId, totalPending);
+
logger.info("Total pending files to transfer {}", totalPending);
for (EndpointPaths endpointPath : request.getEndpointPathsList()) {
@@ -100,6 +111,8 @@ public class TransferOrchestrator {
long running = totalRunningTransfers.incrementAndGet();
long pending = totalPendingTransfers.decrementAndGet();
+ mftConsulClient.updateAgentPendingTransferCount(agentId, pending);
+
logger.info("Received request {}. Total Running {}. Total Pending
{}", transferId, running, pending);
updateStatus.accept(endpointPath, new TransferState()
diff --git
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
index 273e6c4..d2e0252 100644
---
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
+++
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
@@ -71,6 +71,7 @@ public class MFTConsulClient {
public static final String TRANSFER_PROCESSED_PATH =
"mft/transfer/processed/";
public static final String AGENTS_TRANSFER_REQUEST_MESSAGE_PATH =
"mft/agents/transfermessages/";
public static final String AGENTS_SCHEDULED_PATH = "mft/agents/scheduled/";
+ public static final String AGENTS_PENDING_TRANSFER_COUNT_PATH =
"mft/agents/pendingtransfers/";
public static final String CONTROLLER_STATE_MESSAGE_PATH =
"mft/controller/messages/states/";
public static final String CONTROLLER_TRANSFER_MESSAGE_PATH =
"mft/controller/messages/transfers/";
@@ -115,6 +116,32 @@ public class MFTConsulClient {
public MFTConsulClient() {
}
+ public synchronized void updateAgentPendingTransferCount(String agentId,
long transferCount) {
+ try {
+ kvClient.putValue(AGENTS_PENDING_TRANSFER_COUNT_PATH + agentId,
transferCount + "");
+ } catch (Exception e) {
+ logger.error("Failed update pending transfer count {} in consul
for agent {}. But Continuing the execution",
+ transferCount, agentId);
+ }
+ }
+
+ public long getAgentPendingTransferCount(String agentId) {
+ try {
+ Optional<Value> valueOp =
kvClient.getValue(AGENTS_PENDING_TRANSFER_COUNT_PATH + agentId);
+ if (valueOp.isEmpty()) {
+ return 0;
+ }
+ String countAsStr = valueOp.get().getValueAsString().get();
+ return Long.parseLong(countAsStr);
+ } catch (ConsulException e) {
+ if (e.getCode() == 404) {
+ return 0;
+ } else {
+ throw e;
+ }
+ }
+ }
+
public String submitTransfer(TransferApiRequest transferRequest) throws
MFTConsulClientException {
try {
String transferId = UUID.randomUUID().toString();
@@ -271,6 +298,28 @@ public class MFTConsulClient {
}
}
+ public int getEndpointHookCountForAgent(String agentId) throws
MFTConsulClientException {
+ Optional<String> sessionOp = getKvClient().getSession(LIVE_AGENTS_PATH
+ agentId);
+
+ try {
+ try {
+ if (sessionOp.isPresent()) {
+ List<String> transfers =
getKvClient().getKeys(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentId + "/" +
sessionOp.get());
+ return transfers.size();
+ } else {
+ return 0;
+ }
+ } catch (ConsulException e) {
+ if (e.getCode() == 404) {
+ return 0;
+ } else {
+ throw e;
+ }
+ }
+ } catch (Exception e) {
+ throw new MFTConsulClientException("Failed to fetch endpoint hook
count for agent " + agentId, e);
+ }
+ }
/**
* Agents should call this method to submit {@link TransferState}. These
status are received by the controller and reorder
* status messages and put in the final status array.
@@ -418,25 +467,6 @@ public class MFTConsulClient {
+ "/" + getEndpointPathHash(endpointPaths));
}
- public int getEndpointHookCountForAgent(String agentId) throws
MFTConsulClientException {
- Optional<String> session = getKvClient().getSession(LIVE_AGENTS_PATH +
agentId);
-
- try {
- try {
- return session.map(s ->
getKvClient().getKeys(MFTConsulClient.AGENTS_SCHEDULED_PATH
- + agentId + "/" + s).size()).orElse(0);
- } catch (ConsulException e) {
- if (e.getCode() == 404) {
- return 0;
- } else {
- throw e;
- }
- }
- } catch (Exception e) {
- throw new MFTConsulClientException("Failed to fetch endpoint hook
count for agent " + agentId, e);
- }
- }
-
public KeyValueClient getKvClient() {
return kvClient;
}
diff --git
a/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
index 3e3b04d..aa0a462 100644
---
a/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
+++
b/controller/src/main/java/org/apache/airavata/mft/controller/TransferDispatcher.java
@@ -100,21 +100,23 @@ public class TransferDispatcher {
String selectedAgent = null;
List<Optional<AgentInfo>> agentInfos = liveAgentIds.stream().map(
id ->
mftConsulClient.getAgentInfo(id)).collect(Collectors.toList());
- int transferCount = -1;
+ long transferCount = -1;
List<String> candidates = new ArrayList<>();
for (Optional<AgentInfo> agentInfo : agentInfos) {
if (agentInfo.isPresent()) {
- List<String> agentActiveTransfers =
mftConsulClient.getAgentActiveTransfers(agentInfo.get());
- logger.info("Agent {} has transfers assigned {}",
agentInfo.get().getId(), agentActiveTransfers.size());
+ int agentActiveTransfers =
mftConsulClient.getEndpointHookCountForAgent(agentInfo.get().getId());
+ long pendingTransferCount =
mftConsulClient.getAgentPendingTransferCount(agentInfo.get().getId());
+ long totalTransferCount = agentActiveTransfers +
pendingTransferCount;
+ logger.info("Agent {} has transfers assigned {}",
agentInfo.get().getId(), totalTransferCount);
if (transferCount == -1) {
- transferCount = agentActiveTransfers.size();
+ transferCount = totalTransferCount;
candidates.add(agentInfo.get().getId());
- } else if (transferCount == agentActiveTransfers.size()) {
+ } else if (transferCount == totalTransferCount) {
candidates.add(agentInfo.get().getId());
- } else if (transferCount > agentActiveTransfers.size()) {
+ } else if (transferCount > totalTransferCount) {
candidates = new ArrayList<>();
- transferCount = agentActiveTransfers.size();
+ transferCount = totalTransferCount;
candidates.add(agentInfo.get().getId());
}
}