This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new c3e26c8  Cleaning up Agent implmentation
c3e26c8 is described below

commit c3e26c8ae1ac1516a9837534c8e2efaf0bc1da66
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Sun Nov 27 17:12:31 2022 -0500

    Cleaning up Agent implmentation
---
 .../org/apache/airavata/mft/agent/AgentUtil.java   |  56 ++++
 .../org/apache/airavata/mft/agent/AppConfig.java   |  12 +
 .../org/apache/airavata/mft/agent/MFTAgent.java    | 348 +--------------------
 .../airavata/mft/agent/TransferOrchestrator.java   | 209 +++++++++++++
 .../mft/agent/ingress/ConsulIngressHandler.java    | 249 +++++++++++++++
 5 files changed, 528 insertions(+), 346 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/AgentUtil.java 
b/agent/src/main/java/org/apache/airavata/mft/agent/AgentUtil.java
new file mode 100644
index 0000000..a9b1064
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/AgentUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+public final class AgentUtil {
+    @FunctionalInterface
+    public interface ThrowingBiConsumer<T, U, E extends Exception> {
+        void accept(T t, U u) throws E;
+    }
+
+    @FunctionalInterface
+    public interface ThrowingConsumer<T, E extends Exception> {
+        void accept(T t) throws E;
+    }
+
+    public static <T, U> BiConsumer<T, U> throwingBiConsumerWrapper(
+            ThrowingBiConsumer<T, U, Exception> throwingBiConsumer) {
+
+        return (i, j) -> {
+            try {
+                throwingBiConsumer.accept(i, j);
+            } catch (Exception ex) {
+                throw new RuntimeException(ex);
+            }
+        };
+    }
+    public static <T> Consumer<T> throwingConsumerWrapper(
+            ThrowingConsumer<T, Exception> throwingConsumer) {
+
+        return i -> {
+            try {
+                throwingConsumer.accept(i);
+            } catch (Exception ex) {
+                throw new RuntimeException(ex);
+            }
+        };
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java 
b/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
index e1ee354..e7b8fbc 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
@@ -19,6 +19,7 @@ package org.apache.airavata.mft.agent;
 
 import org.apache.airavata.mft.admin.MFTConsulClient;
 import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
+import org.apache.airavata.mft.agent.ingress.ConsulIngressHandler;
 import org.apache.airavata.mft.agent.rpc.RPCParser;
 import org.apache.airavata.mft.resource.client.StorageServiceClient;
 import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
@@ -60,4 +61,15 @@ public class AppConfig {
         return StorageServiceClientBuilder.buildClient(resourceServiceHost, 
resourceServicePort);
     }
 
+    @Bean
+    public ConsulIngressHandler consulIngressHandler() {
+        return new ConsulIngressHandler();
+
+    }
+
+    @Bean
+    public TransferOrchestrator transferOrchestrator() {
+        return new TransferOrchestrator();
+    }
+
 }
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java 
b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 2bc5ff9..1353ca0 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -17,32 +17,10 @@
 
 package org.apache.airavata.mft.agent;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.protobuf.util.JsonFormat;
-import com.orbitz.consul.ConsulException;
-import com.orbitz.consul.cache.ConsulCache;
-import com.orbitz.consul.cache.KVCache;
-import com.orbitz.consul.model.kv.Value;
-import com.orbitz.consul.model.session.ImmutableSession;
-import com.orbitz.consul.model.session.SessionCreatedResponse;
-import org.apache.airavata.mft.admin.MFTConsulClient;
-import org.apache.airavata.mft.admin.MFTConsulClientException;
-import org.apache.airavata.mft.admin.models.AgentInfo;
 import org.apache.airavata.mft.admin.models.TransferState;
-import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
 import org.apache.airavata.mft.agent.http.HttpServer;
 import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
-import org.apache.airavata.mft.agent.rpc.RPCParser;
 import org.apache.airavata.mft.api.service.CallbackEndpoint;
-import org.apache.airavata.mft.api.service.TransferApiRequest;
-import org.apache.airavata.mft.core.FileResourceMetadata;
-import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.api.ConnectorConfig;
-import org.apache.airavata.mft.core.api.MetadataCollector;
-import org.apache.airavata.mft.resource.client.StorageServiceClient;
-import 
org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveRequest;
-import 
org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveResponse;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -51,12 +29,9 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
 
-import java.util.ArrayList;
-import java.util.Arrays;
+import javax.annotation.PreDestroy;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
 
 @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
 public class MFTAgent implements CommandLineRunner {
@@ -66,252 +41,22 @@ public class MFTAgent implements CommandLineRunner {
     @org.springframework.beans.factory.annotation.Value("${agent.id}")
     private String agentId;
 
-    @org.springframework.beans.factory.annotation.Value("${agent.secret}")
-    private String agentSecret;
-
     @org.springframework.beans.factory.annotation.Value("${agent.host}")
     private String agentHost;
 
-    @org.springframework.beans.factory.annotation.Value("${agent.user}")
-    private String agentUser;
-
     @org.springframework.beans.factory.annotation.Value("${agent.http.port}")
     private Integer agentHttpPort;
 
     
@org.springframework.beans.factory.annotation.Value("${agent.https.enabled}")
     private boolean agentHttpsEnabled;
 
-    
@org.springframework.beans.factory.annotation.Value("${agent.supported.protocols}")
-    private String supportedProtocols;
-
-    
@org.springframework.beans.factory.annotation.Value("${agent.temp.data.dir}")
-    private String tempDataDir = "/tmp";
-
-    
@org.springframework.beans.factory.annotation.Value("${resource.service.host}")
-    private String resourceServiceHost;
-
-    
@org.springframework.beans.factory.annotation.Value("${resource.service.port}")
-    private int resourceServicePort;
-
-    
@org.springframework.beans.factory.annotation.Value("${secret.service.host}")
-    private String secretServiceHost;
-
-    
@org.springframework.beans.factory.annotation.Value("${secret.service.port}")
-    private int secretServicePort;
-
-    
@org.springframework.beans.factory.annotation.Value("${agent.concurrent.transfers}")
-    private int concurrentTransfers;
-
-    
@org.springframework.beans.factory.annotation.Value("${agent.concurrent.chunked.threads}")
-    private int concurrentChunkedThreads;
-
-    @org.springframework.beans.factory.annotation.Value("${agent.chunk.size}")
-    private int chunkedSize;
-
-    
@org.springframework.beans.factory.annotation.Value("${agent.chunk.streaming.enabled}")
-    private boolean doChunkStream;
-
     private final Semaphore mainHold = new Semaphore(0);
 
-    private KVCache transferMessageCache;
-    private KVCache rpcMessageCache;
-
-    private ConsulCache.Listener<String, Value> transferCacheListener;
-    private ConsulCache.Listener<String, Value> rpcCacheListener;
-
-    private final ScheduledExecutorService sessionRenewPool = 
Executors.newSingleThreadScheduledExecutor();
-    private long sessionRenewSeconds = 4;
-    private long sessionTTLSeconds = 10;
-    private String session;
-    private ExecutorService transferRequestExecutor;
-
-
-    private TransportMediator mediator;
-
-
-    private ObjectMapper mapper = new ObjectMapper();
-
-    @Autowired
-    private RPCParser rpcParser;
-
-    @Autowired
-    private MFTConsulClient mftConsulClient;
-
     @Autowired
     private HttpTransferRequestsStore transferRequestsStore;
 
-    @Autowired
-    private StorageServiceClient storageServiceClient;
-
-    private final AtomicLong totalRunningTransfers = new AtomicLong(0);
-    private final AtomicLong totalPendingTransfers = new AtomicLong(0);
-
     public void init() {
-        transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), 
MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
-        rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), 
MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
-        mediator = new TransportMediator(tempDataDir,
-                concurrentTransfers,
-                concurrentChunkedThreads,
-                chunkedSize, doChunkStream);
-        transferRequestExecutor = 
Executors.newFixedThreadPool(concurrentTransfers);
-    }
-
-    private void acceptRPCRequests() {
-        rpcCacheListener = newValues -> {
-            newValues.values().forEach(value -> {
-                Optional<String> decodedValue = value.getValueAsString();
-                decodedValue.ifPresent(v -> {
-                    try {
-                        SyncRPCRequest rpcRequest = mapper.readValue(v, 
SyncRPCRequest.class);
-                        
mftConsulClient.sendSyncRPCResponseFromAgent(rpcRequest.getReturnAddress(), 
rpcParser.processRPCRequest(rpcRequest));
-                    } catch (Throwable e) {
-                        logger.error("Error processing the RPC request {}", 
value.getKey(), e);
-                    } finally {
-                        
mftConsulClient.getKvClient().deleteKey(value.getKey());
-                    }
-                });
-            });
-        };
-
-        rpcMessageCache.addListener(rpcCacheListener);
-        rpcMessageCache.start();
-    }
-
-    private void processTransfer(String transferId, String 
transferRequestJson) {
-        logger.info("Received raw message: {}", transferRequestJson);
-        TransferApiRequest request = null;
-        try {
-            TransferApiRequest.Builder builder = 
TransferApiRequest.newBuilder();
-            JsonFormat.parser().merge(transferRequestJson, builder);
-            request = builder.build();
-
-            long running = totalRunningTransfers.incrementAndGet();
-            long pending = totalPendingTransfers.decrementAndGet();
-            logger.info("Received request {}. Total Running {}. Total Pending 
{}", transferId, running, pending);
-
-            mftConsulClient.submitTransferStateToProcess(transferId, agentId, 
new TransferState()
-                    .setState("STARTING")
-                    .setPercentage(0)
-                    .setUpdateTimeMils(System.currentTimeMillis())
-                    .setPublisher(agentId)
-                    .setDescription("Starting the transfer"));
-
-            StorageTypeResolveResponse sourceStorageType = 
storageServiceClient.common()
-                    .resolveStorageType(StorageTypeResolveRequest.newBuilder()
-                    .setStorageId(request.getSourceStorageId()).build());
-            Optional<MetadataCollector> srcMetadataCollectorOp = 
MetadataCollectorResolver
-                    
.resolveMetadataCollector(sourceStorageType.getStorageType());
-            MetadataCollector srcMetadataCollector = 
srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a 
metadata collector for source"));
-            srcMetadataCollector.init(resourceServiceHost, 
resourceServicePort, secretServiceHost, secretServicePort);
 
-            StorageTypeResolveResponse destStorageType = 
storageServiceClient.common()
-                    .resolveStorageType(StorageTypeResolveRequest.newBuilder()
-                    .setStorageId(request.getSourceStorageId()).build());
-
-            FileResourceMetadata srcMetadata = 
srcMetadataCollector.getFileResourceMetadata(
-                    request.getMftAuthorizationToken(),
-                    request.getSourcePath(),
-                    request.getSourceStorageId(),
-                    request.getSourceToken());
-
-
-            ConnectorConfig srcCC = 
ConnectorConfig.ConnectorConfigBuilder.newBuilder()
-                    .withAuthToken(request.getMftAuthorizationToken())
-                    .withResourceServiceHost(resourceServiceHost)
-                    .withResourceServicePort(resourceServicePort)
-                    .withSecretServiceHost(secretServiceHost)
-                    .withSecretServicePort(secretServicePort)
-                    .withTransferId(transferId)
-                    .withStorageId(request.getSourceStorageId())
-                    .withResourcePath(request.getSourcePath())
-                    .withStorageType(sourceStorageType.getStorageType())
-                    .withCredentialToken(request.getSourceToken())
-                    .withMetadata(srcMetadata).build();
-
-            ConnectorConfig dstCC = 
ConnectorConfig.ConnectorConfigBuilder.newBuilder()
-                    .withAuthToken(request.getMftAuthorizationToken())
-                    .withResourceServiceHost(resourceServiceHost)
-                    .withResourceServicePort(resourceServicePort)
-                    .withSecretServiceHost(secretServiceHost)
-                    .withSecretServicePort(secretServicePort)
-                    .withTransferId(transferId)
-                    .withStorageId(request.getDestinationStorageId())
-                    .withResourcePath(request.getDestinationPath())
-                    .withStorageType(destStorageType.getStorageType())
-                    .withCredentialToken(request.getDestinationToken())
-                    .withMetadata(srcMetadata).build();
-
-            mftConsulClient.submitTransferStateToProcess(transferId, agentId, 
new TransferState()
-                    .setState("STARTED")
-                    .setPercentage(0)
-                    .setUpdateTimeMils(System.currentTimeMillis())
-                    .setPublisher(agentId)
-                    .setDescription("Started the transfer"));
-
-            // Save transfer metadata in scheduled path to recover in case of 
an Agent failures. Recovery is done from controller
-            
mftConsulClient.getKvClient().putValue(MFTConsulClient.AGENTS_SCHEDULED_PATH + 
agentId + "/" + session + "/" + transferId, transferRequestJson);
-
-            mediator.transferSingleThread(transferId, request, srcCC, dstCC,
-                    (id, st) -> {
-                        try {
-                            mftConsulClient.submitTransferStateToProcess(id, 
agentId, st.setPublisher(agentId));
-
-                        } catch (MFTConsulClientException e) {
-                            logger.error("Failed while updating transfer 
state", e);
-                        }
-                    },
-                    (id, transferSuccess) -> {
-                        try {
-                            // Delete scheduled key as the transfer completed 
/ failed if it was placed in current session
-                            
mftConsulClient.getKvClient().deleteKey(MFTConsulClient.AGENTS_SCHEDULED_PATH + 
agentId + "/" + session + "/" + id);
-                            long pendingAfter = 
totalRunningTransfers.decrementAndGet();
-                            logger.info("Removed transfer {} from queue with 
transfer success = {}. Total running {}",
-                                    id, transferSuccess, pendingAfter);
-                        } catch (Exception e) {
-                            logger.error("Failed while deleting scheduled path 
for transfer {}", id);
-                        }
-                    });
-
-
-        } catch (Throwable e) {
-            if (request != null) {
-                try {
-                    logger.error("Error in submitting transfer {}", 
transferId, e);
-
-                    mftConsulClient.submitTransferStateToProcess(transferId, 
agentId, new TransferState()
-                            .setState("FAILED")
-                            .setPercentage(0)
-                            .setUpdateTimeMils(System.currentTimeMillis())
-                            .setPublisher(agentId)
-                            .setDescription(ExceptionUtils.getStackTrace(e)));
-                } catch (MFTConsulClientException ex) {
-                    logger.warn(ex.getMessage());
-                    // Ignore
-                }
-            } else {
-                logger.error("Unknown error in processing message {}", 
transferRequestJson, e);
-            }
-        } finally {
-            //logger.info("Deleting key " + consulEntryKey);
-            //mftConsulClient.getKvClient().deleteKey(consulEntryKey); // Due 
to bug in consul https://github.com/hashicorp/consul/issues/571
-        }
-    }
-    private void acceptTransferRequests() {
-
-        transferCacheListener = newValues -> {
-            newValues.values().forEach(value -> {
-                Optional<String> decodedValue = value.getValueAsString();
-                String transferId = 
value.getKey().substring(value.getKey().lastIndexOf("/") + 1);
-                decodedValue.ifPresent(v -> {
-                    mftConsulClient.getKvClient().deleteKey(value.getKey());
-                    long totalPending = 
totalPendingTransfers.incrementAndGet();
-                    logger.info("Total pending transfers {}", totalPending);
-                    transferRequestExecutor.submit(() -> 
processTransfer(transferId, v));
-                });
-            });
-        };
-        transferMessageCache.addListener(transferCacheListener);
-        transferMessageCache.start();
     }
 
     private void handleCallbacks(List<CallbackEndpoint> callbackEndpoints, 
String transferId, TransferState transferState) {
@@ -340,103 +85,14 @@ public class MFTAgent implements CommandLineRunner {
         }).start();
     }
 
-    private boolean connectAgent() throws MFTConsulClientException {
-        final ImmutableSession session = ImmutableSession.builder()
-                .name(agentId)
-                .behavior("delete")
-                .ttl(sessionTTLSeconds + "s").build();
-
-        final SessionCreatedResponse sessResp = 
mftConsulClient.getSessionClient().createSession(session);
-        final String lockPath = MFTConsulClient.LIVE_AGENTS_PATH + agentId;
-
-        boolean acquired = mftConsulClient.getKvClient().acquireLock(lockPath, 
sessResp.getId());
-
-        if (acquired) {
-            this.session = sessResp.getId();
-            sessionRenewPool.scheduleAtFixedRate(() -> {
-                try {
-                    
mftConsulClient.getSessionClient().renewSession(sessResp.getId());
-                } catch (ConsulException e) {
-                    if (e.getCode() == 404) {
-                        logger.error("Can not renew session as it is expired");
-                        stop();
-                    }
-                    logger.warn("Errored while renewing the session", e);
-                    try {
-                        boolean status = 
mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
-                        if (!status) {
-                            logger.error("Can not renew session as it is 
expired");
-                            stop();
-                        }
-                    } catch (Exception ex) {
-                        logger.error("Can not renew session as it is expired");
-                        stop();
-                    }
-                } catch (Exception e) {
-                    try {
-                        boolean status = 
mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
-                        if (!status) {
-                            logger.error("Can not renew session as it is 
expired");
-                            stop();
-                        }
-                    } catch (Exception ex) {
-                        logger.error("Can not renew session as it is expired");
-                        stop();
-                    }
-                }
-            }, sessionRenewSeconds, sessionRenewSeconds, TimeUnit.SECONDS);
-
-            this.mftConsulClient.registerAgent(new AgentInfo()
-                    .setId(agentId)
-                    .setHost(agentHost)
-                    .setUser(agentUser)
-                    .setSessionId(this.session)
-                    
.setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
-                    .setLocalStorages(new ArrayList<>()));
-        }
-
-        logger.info("Acquired lock " + acquired);
-        return acquired;
-    }
-
-    public void disconnectAgent() {
-        sessionRenewPool.shutdown();
-        if (transferCacheListener != null) {
-            transferMessageCache.removeListener(transferCacheListener);
-        }
-
-        if (rpcCacheListener != null) {
-            rpcMessageCache.removeListener(rpcCacheListener);
-        }
-    }
-
+    @PreDestroy
     public void stop() {
         logger.info("Stopping Agent " + agentId);
-        disconnectAgent();
         mainHold.release();
-        transferRequestExecutor.shutdown();
     }
 
     public void start() throws Exception {
         init();
-        boolean connected = false;
-        int connectionRetries = 0;
-        while (!connected) {
-            connected = connectAgent();
-            if (connected) {
-                logger.info("Successfully connected to consul with session id 
{}", session);
-            } else {
-                logger.info("Retrying to connect to consul");
-                Thread.sleep(5000);
-                connectionRetries++;
-                if (connectionRetries > 10) {
-                    throw new Exception("Failed to connect to the cluster");
-                }
-            }
-        }
-
-        acceptTransferRequests();
-        acceptRPCRequests();
         acceptHTTPRequests();
     }
 
diff --git 
a/agent/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java 
b/agent/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
new file mode 100644
index 0000000..81c7e66
--- /dev/null
+++ 
b/agent/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent;
+
+import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.api.service.TransferApiRequest;
+import org.apache.airavata.mft.core.FileResourceMetadata;
+import org.apache.airavata.mft.core.MetadataCollectorResolver;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import 
org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveRequest;
+import 
org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveResponse;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+public class TransferOrchestrator {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TransferOrchestrator.class);
+
+    private final AtomicLong totalRunningTransfers = new AtomicLong(0);
+    private final AtomicLong totalPendingTransfers = new AtomicLong(0);
+
+    
@org.springframework.beans.factory.annotation.Value("${agent.concurrent.transfers}")
+    private int concurrentTransfers;
+
+    private ExecutorService transferRequestExecutor;
+
+    private TransportMediator mediator;
+
+    
@org.springframework.beans.factory.annotation.Value("${agent.concurrent.chunked.threads}")
+    private int concurrentChunkedThreads;
+
+    @org.springframework.beans.factory.annotation.Value("${agent.chunk.size}")
+    private int chunkedSize;
+
+    
@org.springframework.beans.factory.annotation.Value("${agent.chunk.streaming.enabled}")
+    private boolean doChunkStream;
+
+    
@org.springframework.beans.factory.annotation.Value("${agent.temp.data.dir}")
+    private String tempDataDir = "/tmp";
+
+    
@org.springframework.beans.factory.annotation.Value("${resource.service.host}")
+    private String resourceServiceHost;
+
+    
@org.springframework.beans.factory.annotation.Value("${resource.service.port}")
+    private int resourceServicePort;
+
+    
@org.springframework.beans.factory.annotation.Value("${secret.service.host}")
+    private String secretServiceHost;
+
+    
@org.springframework.beans.factory.annotation.Value("${secret.service.port}")
+    private int secretServicePort;
+
+    @Autowired
+    private StorageServiceClient storageServiceClient;
+
+    @PostConstruct
+    public void init() {
+        transferRequestExecutor  = 
Executors.newFixedThreadPool(concurrentTransfers);
+        mediator = new TransportMediator(tempDataDir,
+                concurrentTransfers,
+                concurrentChunkedThreads,
+                chunkedSize, doChunkStream);
+        logger.info("Transfer orchestrator initialized");
+    }
+
+    @PreDestroy
+    public void destroy() {
+        transferRequestExecutor.shutdown();
+        logger.info("Transfer orchestrator turned off");
+    }
+
+    public void submitTransferToProcess(String transferId, TransferApiRequest 
request,
+                                        BiConsumer<String, TransferState> 
updateStatus,
+                                        Consumer<Boolean> createTransferHook) {
+        long totalPending = totalPendingTransfers.incrementAndGet();
+        logger.info("Total pending transfers {}", totalPending);
+        transferRequestExecutor.submit(() -> processTransfer(transferId, 
request, updateStatus, createTransferHook));
+    }
+
+    public void processTransfer(String transferId, TransferApiRequest request,
+                                BiConsumer<String, TransferState> 
updateStatus, Consumer<Boolean> createTransferHook) {
+        try {
+
+            long running = totalRunningTransfers.incrementAndGet();
+            long pending = totalPendingTransfers.decrementAndGet();
+            logger.info("Received request {}. Total Running {}. Total Pending 
{}", transferId, running, pending);
+
+            updateStatus.accept(transferId, new TransferState()
+                    .setState("STARTING")
+                    .setPercentage(0)
+                    .setUpdateTimeMils(System.currentTimeMillis())
+                    .setDescription("Starting the transfer"));
+
+            StorageTypeResolveResponse sourceStorageType = 
storageServiceClient.common()
+                    .resolveStorageType(StorageTypeResolveRequest.newBuilder()
+                            
.setStorageId(request.getSourceStorageId()).build());
+            Optional<MetadataCollector> srcMetadataCollectorOp = 
MetadataCollectorResolver
+                    
.resolveMetadataCollector(sourceStorageType.getStorageType());
+            MetadataCollector srcMetadataCollector = 
srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a 
metadata collector for source"));
+            srcMetadataCollector.init(resourceServiceHost, 
resourceServicePort, secretServiceHost, secretServicePort);
+
+            StorageTypeResolveResponse destStorageType = 
storageServiceClient.common()
+                    .resolveStorageType(StorageTypeResolveRequest.newBuilder()
+                            
.setStorageId(request.getSourceStorageId()).build());
+
+            FileResourceMetadata srcMetadata = 
srcMetadataCollector.getFileResourceMetadata(
+                    request.getMftAuthorizationToken(),
+                    request.getSourcePath(),
+                    request.getSourceStorageId(),
+                    request.getSourceToken());
+
+
+            ConnectorConfig srcCC = 
ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+                    .withAuthToken(request.getMftAuthorizationToken())
+                    .withResourceServiceHost(resourceServiceHost)
+                    .withResourceServicePort(resourceServicePort)
+                    .withSecretServiceHost(secretServiceHost)
+                    .withSecretServicePort(secretServicePort)
+                    .withTransferId(transferId)
+                    .withStorageId(request.getSourceStorageId())
+                    .withResourcePath(request.getSourcePath())
+                    .withStorageType(sourceStorageType.getStorageType())
+                    .withCredentialToken(request.getSourceToken())
+                    .withMetadata(srcMetadata).build();
+
+            ConnectorConfig dstCC = 
ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+                    .withAuthToken(request.getMftAuthorizationToken())
+                    .withResourceServiceHost(resourceServiceHost)
+                    .withResourceServicePort(resourceServicePort)
+                    .withSecretServiceHost(secretServiceHost)
+                    .withSecretServicePort(secretServicePort)
+                    .withTransferId(transferId)
+                    .withStorageId(request.getDestinationStorageId())
+                    .withResourcePath(request.getDestinationPath())
+                    .withStorageType(destStorageType.getStorageType())
+                    .withCredentialToken(request.getDestinationToken())
+                    .withMetadata(srcMetadata).build();
+
+            updateStatus.accept(transferId, new TransferState()
+                    .setState("STARTED")
+                    .setPercentage(0)
+                    .setUpdateTimeMils(System.currentTimeMillis())
+                    .setDescription("Started the transfer"));
+
+            // Save transfer metadata in scheduled path to recover in case of 
an Agent failures. Recovery is done from controller
+            createTransferHook.accept(true);
+
+            mediator.transferSingleThread(transferId, request, srcCC, dstCC, 
updateStatus,
+                    (id, transferSuccess) -> {
+                        try {
+                            // Delete scheduled key as the transfer completed 
/ failed if it was placed in current session
+                            createTransferHook.accept(false);
+                            long pendingAfter = 
totalRunningTransfers.decrementAndGet();
+                            logger.info("Removed transfer {} from queue with 
transfer success = {}. Total running {}",
+                                    id, transferSuccess, pendingAfter);
+                        } catch (Exception e) {
+                            logger.error("Failed while deleting scheduled path 
for transfer {}", id);
+                        }
+                    });
+
+
+        } catch (Throwable e) {
+            if (request != null) {
+                logger.error("Error in submitting transfer {}", transferId, e);
+
+                updateStatus.accept(transferId, new TransferState()
+                        .setState("FAILED")
+                        .setPercentage(0)
+                        .setUpdateTimeMils(System.currentTimeMillis())
+                        .setDescription(ExceptionUtils.getStackTrace(e)));
+
+            } else {
+                logger.error("Unknown error in processing message {}", 
request.toString(), e);
+            }
+        } finally {
+            //logger.info("Deleting key " + consulEntryKey);
+            //mftConsulClient.getKvClient().deleteKey(consulEntryKey); // Due 
to bug in consul https://github.com/hashicorp/consul/issues/571
+        }
+    }
+
+}
diff --git 
a/agent/src/main/java/org/apache/airavata/mft/agent/ingress/ConsulIngressHandler.java
 
b/agent/src/main/java/org/apache/airavata/mft/agent/ingress/ConsulIngressHandler.java
new file mode 100644
index 0000000..f3ffdd2
--- /dev/null
+++ 
b/agent/src/main/java/org/apache/airavata/mft/agent/ingress/ConsulIngressHandler.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.ingress;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.util.JsonFormat;
+import com.orbitz.consul.ConsulException;
+import com.orbitz.consul.cache.ConsulCache;
+import com.orbitz.consul.cache.KVCache;
+import com.orbitz.consul.model.kv.Value;
+import com.orbitz.consul.model.session.ImmutableSession;
+import com.orbitz.consul.model.session.SessionCreatedResponse;
+import org.apache.airavata.mft.admin.MFTConsulClient;
+import org.apache.airavata.mft.admin.MFTConsulClientException;
+import org.apache.airavata.mft.admin.models.AgentInfo;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
+import org.apache.airavata.mft.agent.AgentUtil;
+import org.apache.airavata.mft.agent.TransferOrchestrator;
+import org.apache.airavata.mft.agent.rpc.RPCParser;
+import org.apache.airavata.mft.api.service.TransferApiRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class ConsulIngressHandler {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ConsulIngressHandler.class);
+
+    private KVCache transferMessageCache;
+    private KVCache rpcMessageCache;
+
+    private ConsulCache.Listener<String, Value> transferCacheListener;
+    private ConsulCache.Listener<String, Value> rpcCacheListener;
+    @Autowired
+    private MFTConsulClient mftConsulClient;
+    @Autowired
+    private RPCParser rpcParser;
+
+    private String session;
+
+    private final ScheduledExecutorService sessionRenewPool = 
Executors.newSingleThreadScheduledExecutor();
+
+    private long sessionRenewSeconds = 4;
+    private long sessionTTLSeconds = 10;
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @org.springframework.beans.factory.annotation.Value("${agent.id}")
+    private String agentId;
+
+    @org.springframework.beans.factory.annotation.Value("${agent.secret}")
+    private String agentSecret;
+
+    @org.springframework.beans.factory.annotation.Value("${agent.host}")
+    private String agentHost;
+
+    @org.springframework.beans.factory.annotation.Value("${agent.user}")
+    private String agentUser;
+
+    
@org.springframework.beans.factory.annotation.Value("${agent.supported.protocols}")
+    private String supportedProtocols;
+
+    @Autowired
+    private TransferOrchestrator transferOrchestrator;
+
+    private void acceptTransferRequests() {
+
+        transferCacheListener = newValues -> {
+            newValues.values().forEach(value -> {
+                Optional<String> decodedValue = value.getValueAsString();
+                String transferId = 
value.getKey().substring(value.getKey().lastIndexOf("/") + 1);
+                decodedValue.ifPresent(reqJson -> {
+                    mftConsulClient.getKvClient().deleteKey(value.getKey());
+                    TransferApiRequest.Builder builder = 
TransferApiRequest.newBuilder();
+                    try {
+                        JsonFormat.parser().merge(reqJson, builder);
+
+                    } catch (Exception e) {
+                        logger.error("Failed to parse json string {} into a 
transfer request", reqJson);
+                        return;
+                    }
+                    TransferApiRequest request = builder.build();
+                    transferOrchestrator.submitTransferToProcess(transferId, 
request,
+                            AgentUtil.throwingBiConsumerWrapper((id, st) -> {
+                                
mftConsulClient.submitTransferStateToProcess(id, agentId, 
st.setPublisher(agentId));
+                            }),
+                            AgentUtil.throwingConsumerWrapper(create -> {
+                                if (create) {
+                                    
mftConsulClient.getKvClient().putValue(MFTConsulClient.AGENTS_SCHEDULED_PATH + 
agentId + "/" + session + "/" + transferId, reqJson);
+                                } else {
+                                    
mftConsulClient.getKvClient().deleteKey(MFTConsulClient.AGENTS_SCHEDULED_PATH + 
agentId + "/" + session + "/" + transferId);
+                                }
+                            }));
+                });
+            });
+        };
+        transferMessageCache.addListener(transferCacheListener);
+        transferMessageCache.start();
+    }
+
+    private void acceptRPCRequests() {
+        rpcCacheListener = newValues -> {
+            newValues.values().forEach(value -> {
+                Optional<String> decodedValue = value.getValueAsString();
+                decodedValue.ifPresent(v -> {
+                    try {
+                        SyncRPCRequest rpcRequest = mapper.readValue(v, 
SyncRPCRequest.class);
+                        SyncRPCResponse syncRPCResponse = 
rpcParser.processRPCRequest(rpcRequest);
+                        
mftConsulClient.sendSyncRPCResponseFromAgent(rpcRequest.getReturnAddress(), 
syncRPCResponse);
+                    } catch (Throwable e) {
+                        logger.error("Error processing the RPC request {}", 
value.getKey(), e);
+                    } finally {
+                        
mftConsulClient.getKvClient().deleteKey(value.getKey());
+                    }
+                });
+            });
+        };
+
+        rpcMessageCache.addListener(rpcCacheListener);
+        rpcMessageCache.start();
+    }
+
+    private boolean connectAgent() throws MFTConsulClientException {
+        final ImmutableSession session = ImmutableSession.builder()
+                .name(agentId)
+                .behavior("delete")
+                .ttl(sessionTTLSeconds + "s").build();
+
+        final SessionCreatedResponse sessResp = 
mftConsulClient.getSessionClient().createSession(session);
+        final String lockPath = MFTConsulClient.LIVE_AGENTS_PATH + agentId;
+
+        boolean acquired = mftConsulClient.getKvClient().acquireLock(lockPath, 
sessResp.getId());
+
+        if (acquired) {
+            this.session = sessResp.getId();
+            sessionRenewPool.scheduleAtFixedRate(() -> {
+                try {
+                    
mftConsulClient.getSessionClient().renewSession(sessResp.getId());
+                } catch (ConsulException e) {
+                    if (e.getCode() == 404) {
+                        logger.error("Can not renew session as it is expired");
+                        destroy();
+                    }
+                    logger.warn("Errored while renewing the session", e);
+                    try {
+                        boolean status = 
mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
+                        if (!status) {
+                            logger.error("Can not renew session as it is 
expired");
+                            destroy();
+                        }
+                    } catch (Exception ex) {
+                        logger.error("Can not renew session as it is expired");
+                        destroy();
+                    }
+                } catch (Exception e) {
+                    try {
+                        boolean status = 
mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
+                        if (!status) {
+                            logger.error("Can not renew session as it is 
expired");
+                            destroy();
+                        }
+                    } catch (Exception ex) {
+                        logger.error("Can not renew session as it is expired");
+                        destroy();
+                    }
+                }
+            }, sessionRenewSeconds, sessionRenewSeconds, TimeUnit.SECONDS);
+
+            this.mftConsulClient.registerAgent(new AgentInfo()
+                    .setId(agentId)
+                    .setHost(agentHost)
+                    .setUser(agentUser)
+                    .setSessionId(this.session)
+                    
.setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
+                    .setLocalStorages(new ArrayList<>()));
+        }
+
+        logger.info("Acquired lock " + acquired);
+        return acquired;
+    }
+
+    @PostConstruct
+    public void init() throws Exception {
+        transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), 
MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
+        rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), 
MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
+
+        boolean connected = false;
+        int connectionRetries = 0;
+        while (!connected) {
+            connected = connectAgent();
+            if (connected) {
+                logger.info("Successfully connected to consul with session id 
{}", session);
+            } else {
+                logger.info("Retrying to connect to consul");
+                Thread.sleep(5000);
+                connectionRetries++;
+                if (connectionRetries > 10) {
+                    throw new Exception("Failed to connect to the cluster");
+                }
+            }
+        }
+
+        acceptTransferRequests();
+        acceptRPCRequests();
+        logger.info("Consul ingress handler initialized");
+
+    }
+
+    @PreDestroy
+    public void destroy() {
+        if (!sessionRenewPool.isShutdown())
+            sessionRenewPool.shutdown();
+        if (transferCacheListener != null) {
+            transferMessageCache.removeListener(transferCacheListener);
+        }
+
+        if (rpcCacheListener != null) {
+            rpcMessageCache.removeListener(rpcCacheListener);
+        }
+        logger.info("Consul ingress handler turned off");
+    }
+
+}


Reply via email to