This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
commit 96309a4e349414fa032e5a37adf10fe3d0ea6f14 Author: Dimuthu Wannipurage <[email protected]> AuthorDate: Tue Sep 27 21:17:37 2022 -0400 Logging improvements --- .../java/org/apache/airavata/mft/agent/MFTAgent.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java index 9490225..a817f91 100644 --- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java +++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java @@ -48,14 +48,12 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) public class MFTAgent implements CommandLineRunner { @@ -139,6 +137,9 @@ public class MFTAgent implements CommandLineRunner { @Autowired private HttpTransferRequestsStore transferRequestsStore; + private final AtomicLong totalRunningTransfers = new AtomicLong(0); + private final AtomicLong totalPendingTransfers = new AtomicLong(0); + public void init() { transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId); rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId); @@ -178,7 +179,10 @@ public class MFTAgent implements CommandLineRunner { JsonFormat.parser().merge(transferRequestJson, builder); request = builder.build(); - logger.info("Received request " + transferId); + long running = totalRunningTransfers.incrementAndGet(); + long pending = totalPendingTransfers.decrementAndGet(); + logger.info("Received request {}. Total Running {}. Total Pending {}", transferId, running, pending); + mftConsulClient.submitTransferStateToProcess(transferId, agentId, new TransferState() .setState("STARTING") .setPercentage(0) @@ -245,6 +249,9 @@ public class MFTAgent implements CommandLineRunner { try { // Delete scheduled key as the transfer completed / failed if it was placed in current session mftConsulClient.getKvClient().deleteKey(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentId + "/" + session + "/" + id); + long pendingAfter = totalRunningTransfers.decrementAndGet(); + logger.info("Removed transfer {} from queue with transfer success = {}. Total running {}", + id, transferSuccess, pendingAfter); } catch (Exception e) { logger.error("Failed while deleting scheduled path for transfer {}", id); } @@ -282,6 +289,8 @@ public class MFTAgent implements CommandLineRunner { String transferId = value.getKey().substring(value.getKey().lastIndexOf("/") + 1); decodedValue.ifPresent(v -> { mftConsulClient.getKvClient().deleteKey(value.getKey()); + long totalPending = totalPendingTransfers.incrementAndGet(); + logger.info("Total pending transfers {}", totalPending); transferRequestExecutor.submit(() -> processTransfer(transferId, v)); }); });
