This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new c3e26c8 Cleaning up Agent implmentation
c3e26c8 is described below
commit c3e26c8ae1ac1516a9837534c8e2efaf0bc1da66
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Sun Nov 27 17:12:31 2022 -0500
Cleaning up Agent implmentation
---
.../org/apache/airavata/mft/agent/AgentUtil.java | 56 ++++
.../org/apache/airavata/mft/agent/AppConfig.java | 12 +
.../org/apache/airavata/mft/agent/MFTAgent.java | 348 +--------------------
.../airavata/mft/agent/TransferOrchestrator.java | 209 +++++++++++++
.../mft/agent/ingress/ConsulIngressHandler.java | 249 +++++++++++++++
5 files changed, 528 insertions(+), 346 deletions(-)
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/AgentUtil.java
b/agent/src/main/java/org/apache/airavata/mft/agent/AgentUtil.java
new file mode 100644
index 0000000..a9b1064
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/AgentUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+public final class AgentUtil {
+ @FunctionalInterface
+ public interface ThrowingBiConsumer<T, U, E extends Exception> {
+ void accept(T t, U u) throws E;
+ }
+
+ @FunctionalInterface
+ public interface ThrowingConsumer<T, E extends Exception> {
+ void accept(T t) throws E;
+ }
+
+ public static <T, U> BiConsumer<T, U> throwingBiConsumerWrapper(
+ ThrowingBiConsumer<T, U, Exception> throwingBiConsumer) {
+
+ return (i, j) -> {
+ try {
+ throwingBiConsumer.accept(i, j);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ };
+ }
+ public static <T> Consumer<T> throwingConsumerWrapper(
+ ThrowingConsumer<T, Exception> throwingConsumer) {
+
+ return i -> {
+ try {
+ throwingConsumer.accept(i);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ };
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
b/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
index e1ee354..e7b8fbc 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
@@ -19,6 +19,7 @@ package org.apache.airavata.mft.agent;
import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
+import org.apache.airavata.mft.agent.ingress.ConsulIngressHandler;
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.apache.airavata.mft.resource.client.StorageServiceClient;
import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
@@ -60,4 +61,15 @@ public class AppConfig {
return StorageServiceClientBuilder.buildClient(resourceServiceHost,
resourceServicePort);
}
+ @Bean
+ public ConsulIngressHandler consulIngressHandler() {
+ return new ConsulIngressHandler();
+
+ }
+
+ @Bean
+ public TransferOrchestrator transferOrchestrator() {
+ return new TransferOrchestrator();
+ }
+
}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 2bc5ff9..1353ca0 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -17,32 +17,10 @@
package org.apache.airavata.mft.agent;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.protobuf.util.JsonFormat;
-import com.orbitz.consul.ConsulException;
-import com.orbitz.consul.cache.ConsulCache;
-import com.orbitz.consul.cache.KVCache;
-import com.orbitz.consul.model.kv.Value;
-import com.orbitz.consul.model.session.ImmutableSession;
-import com.orbitz.consul.model.session.SessionCreatedResponse;
-import org.apache.airavata.mft.admin.MFTConsulClient;
-import org.apache.airavata.mft.admin.MFTConsulClientException;
-import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferState;
-import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.agent.http.HttpServer;
import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
-import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.apache.airavata.mft.api.service.CallbackEndpoint;
-import org.apache.airavata.mft.api.service.TransferApiRequest;
-import org.apache.airavata.mft.core.FileResourceMetadata;
-import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.api.ConnectorConfig;
-import org.apache.airavata.mft.core.api.MetadataCollector;
-import org.apache.airavata.mft.resource.client.StorageServiceClient;
-import
org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveRequest;
-import
org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveResponse;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -51,12 +29,9 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
-import java.util.ArrayList;
-import java.util.Arrays;
+import javax.annotation.PreDestroy;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class MFTAgent implements CommandLineRunner {
@@ -66,252 +41,22 @@ public class MFTAgent implements CommandLineRunner {
@org.springframework.beans.factory.annotation.Value("${agent.id}")
private String agentId;
- @org.springframework.beans.factory.annotation.Value("${agent.secret}")
- private String agentSecret;
-
@org.springframework.beans.factory.annotation.Value("${agent.host}")
private String agentHost;
- @org.springframework.beans.factory.annotation.Value("${agent.user}")
- private String agentUser;
-
@org.springframework.beans.factory.annotation.Value("${agent.http.port}")
private Integer agentHttpPort;
@org.springframework.beans.factory.annotation.Value("${agent.https.enabled}")
private boolean agentHttpsEnabled;
-
@org.springframework.beans.factory.annotation.Value("${agent.supported.protocols}")
- private String supportedProtocols;
-
-
@org.springframework.beans.factory.annotation.Value("${agent.temp.data.dir}")
- private String tempDataDir = "/tmp";
-
-
@org.springframework.beans.factory.annotation.Value("${resource.service.host}")
- private String resourceServiceHost;
-
-
@org.springframework.beans.factory.annotation.Value("${resource.service.port}")
- private int resourceServicePort;
-
-
@org.springframework.beans.factory.annotation.Value("${secret.service.host}")
- private String secretServiceHost;
-
-
@org.springframework.beans.factory.annotation.Value("${secret.service.port}")
- private int secretServicePort;
-
-
@org.springframework.beans.factory.annotation.Value("${agent.concurrent.transfers}")
- private int concurrentTransfers;
-
-
@org.springframework.beans.factory.annotation.Value("${agent.concurrent.chunked.threads}")
- private int concurrentChunkedThreads;
-
- @org.springframework.beans.factory.annotation.Value("${agent.chunk.size}")
- private int chunkedSize;
-
-
@org.springframework.beans.factory.annotation.Value("${agent.chunk.streaming.enabled}")
- private boolean doChunkStream;
-
private final Semaphore mainHold = new Semaphore(0);
- private KVCache transferMessageCache;
- private KVCache rpcMessageCache;
-
- private ConsulCache.Listener<String, Value> transferCacheListener;
- private ConsulCache.Listener<String, Value> rpcCacheListener;
-
- private final ScheduledExecutorService sessionRenewPool =
Executors.newSingleThreadScheduledExecutor();
- private long sessionRenewSeconds = 4;
- private long sessionTTLSeconds = 10;
- private String session;
- private ExecutorService transferRequestExecutor;
-
-
- private TransportMediator mediator;
-
-
- private ObjectMapper mapper = new ObjectMapper();
-
- @Autowired
- private RPCParser rpcParser;
-
- @Autowired
- private MFTConsulClient mftConsulClient;
-
@Autowired
private HttpTransferRequestsStore transferRequestsStore;
- @Autowired
- private StorageServiceClient storageServiceClient;
-
- private final AtomicLong totalRunningTransfers = new AtomicLong(0);
- private final AtomicLong totalPendingTransfers = new AtomicLong(0);
-
public void init() {
- transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(),
MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
- rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(),
MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
- mediator = new TransportMediator(tempDataDir,
- concurrentTransfers,
- concurrentChunkedThreads,
- chunkedSize, doChunkStream);
- transferRequestExecutor =
Executors.newFixedThreadPool(concurrentTransfers);
- }
-
- private void acceptRPCRequests() {
- rpcCacheListener = newValues -> {
- newValues.values().forEach(value -> {
- Optional<String> decodedValue = value.getValueAsString();
- decodedValue.ifPresent(v -> {
- try {
- SyncRPCRequest rpcRequest = mapper.readValue(v,
SyncRPCRequest.class);
-
mftConsulClient.sendSyncRPCResponseFromAgent(rpcRequest.getReturnAddress(),
rpcParser.processRPCRequest(rpcRequest));
- } catch (Throwable e) {
- logger.error("Error processing the RPC request {}",
value.getKey(), e);
- } finally {
-
mftConsulClient.getKvClient().deleteKey(value.getKey());
- }
- });
- });
- };
-
- rpcMessageCache.addListener(rpcCacheListener);
- rpcMessageCache.start();
- }
-
- private void processTransfer(String transferId, String
transferRequestJson) {
- logger.info("Received raw message: {}", transferRequestJson);
- TransferApiRequest request = null;
- try {
- TransferApiRequest.Builder builder =
TransferApiRequest.newBuilder();
- JsonFormat.parser().merge(transferRequestJson, builder);
- request = builder.build();
-
- long running = totalRunningTransfers.incrementAndGet();
- long pending = totalPendingTransfers.decrementAndGet();
- logger.info("Received request {}. Total Running {}. Total Pending
{}", transferId, running, pending);
-
- mftConsulClient.submitTransferStateToProcess(transferId, agentId,
new TransferState()
- .setState("STARTING")
- .setPercentage(0)
- .setUpdateTimeMils(System.currentTimeMillis())
- .setPublisher(agentId)
- .setDescription("Starting the transfer"));
-
- StorageTypeResolveResponse sourceStorageType =
storageServiceClient.common()
- .resolveStorageType(StorageTypeResolveRequest.newBuilder()
- .setStorageId(request.getSourceStorageId()).build());
- Optional<MetadataCollector> srcMetadataCollectorOp =
MetadataCollectorResolver
-
.resolveMetadataCollector(sourceStorageType.getStorageType());
- MetadataCollector srcMetadataCollector =
srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a
metadata collector for source"));
- srcMetadataCollector.init(resourceServiceHost,
resourceServicePort, secretServiceHost, secretServicePort);
- StorageTypeResolveResponse destStorageType =
storageServiceClient.common()
- .resolveStorageType(StorageTypeResolveRequest.newBuilder()
- .setStorageId(request.getSourceStorageId()).build());
-
- FileResourceMetadata srcMetadata =
srcMetadataCollector.getFileResourceMetadata(
- request.getMftAuthorizationToken(),
- request.getSourcePath(),
- request.getSourceStorageId(),
- request.getSourceToken());
-
-
- ConnectorConfig srcCC =
ConnectorConfig.ConnectorConfigBuilder.newBuilder()
- .withAuthToken(request.getMftAuthorizationToken())
- .withResourceServiceHost(resourceServiceHost)
- .withResourceServicePort(resourceServicePort)
- .withSecretServiceHost(secretServiceHost)
- .withSecretServicePort(secretServicePort)
- .withTransferId(transferId)
- .withStorageId(request.getSourceStorageId())
- .withResourcePath(request.getSourcePath())
- .withStorageType(sourceStorageType.getStorageType())
- .withCredentialToken(request.getSourceToken())
- .withMetadata(srcMetadata).build();
-
- ConnectorConfig dstCC =
ConnectorConfig.ConnectorConfigBuilder.newBuilder()
- .withAuthToken(request.getMftAuthorizationToken())
- .withResourceServiceHost(resourceServiceHost)
- .withResourceServicePort(resourceServicePort)
- .withSecretServiceHost(secretServiceHost)
- .withSecretServicePort(secretServicePort)
- .withTransferId(transferId)
- .withStorageId(request.getDestinationStorageId())
- .withResourcePath(request.getDestinationPath())
- .withStorageType(destStorageType.getStorageType())
- .withCredentialToken(request.getDestinationToken())
- .withMetadata(srcMetadata).build();
-
- mftConsulClient.submitTransferStateToProcess(transferId, agentId,
new TransferState()
- .setState("STARTED")
- .setPercentage(0)
- .setUpdateTimeMils(System.currentTimeMillis())
- .setPublisher(agentId)
- .setDescription("Started the transfer"));
-
- // Save transfer metadata in scheduled path to recover in case of
an Agent failures. Recovery is done from controller
-
mftConsulClient.getKvClient().putValue(MFTConsulClient.AGENTS_SCHEDULED_PATH +
agentId + "/" + session + "/" + transferId, transferRequestJson);
-
- mediator.transferSingleThread(transferId, request, srcCC, dstCC,
- (id, st) -> {
- try {
- mftConsulClient.submitTransferStateToProcess(id,
agentId, st.setPublisher(agentId));
-
- } catch (MFTConsulClientException e) {
- logger.error("Failed while updating transfer
state", e);
- }
- },
- (id, transferSuccess) -> {
- try {
- // Delete scheduled key as the transfer completed
/ failed if it was placed in current session
-
mftConsulClient.getKvClient().deleteKey(MFTConsulClient.AGENTS_SCHEDULED_PATH +
agentId + "/" + session + "/" + id);
- long pendingAfter =
totalRunningTransfers.decrementAndGet();
- logger.info("Removed transfer {} from queue with
transfer success = {}. Total running {}",
- id, transferSuccess, pendingAfter);
- } catch (Exception e) {
- logger.error("Failed while deleting scheduled path
for transfer {}", id);
- }
- });
-
-
- } catch (Throwable e) {
- if (request != null) {
- try {
- logger.error("Error in submitting transfer {}",
transferId, e);
-
- mftConsulClient.submitTransferStateToProcess(transferId,
agentId, new TransferState()
- .setState("FAILED")
- .setPercentage(0)
- .setUpdateTimeMils(System.currentTimeMillis())
- .setPublisher(agentId)
- .setDescription(ExceptionUtils.getStackTrace(e)));
- } catch (MFTConsulClientException ex) {
- logger.warn(ex.getMessage());
- // Ignore
- }
- } else {
- logger.error("Unknown error in processing message {}",
transferRequestJson, e);
- }
- } finally {
- //logger.info("Deleting key " + consulEntryKey);
- //mftConsulClient.getKvClient().deleteKey(consulEntryKey); // Due
to bug in consul https://github.com/hashicorp/consul/issues/571
- }
- }
- private void acceptTransferRequests() {
-
- transferCacheListener = newValues -> {
- newValues.values().forEach(value -> {
- Optional<String> decodedValue = value.getValueAsString();
- String transferId =
value.getKey().substring(value.getKey().lastIndexOf("/") + 1);
- decodedValue.ifPresent(v -> {
- mftConsulClient.getKvClient().deleteKey(value.getKey());
- long totalPending =
totalPendingTransfers.incrementAndGet();
- logger.info("Total pending transfers {}", totalPending);
- transferRequestExecutor.submit(() ->
processTransfer(transferId, v));
- });
- });
- };
- transferMessageCache.addListener(transferCacheListener);
- transferMessageCache.start();
}
private void handleCallbacks(List<CallbackEndpoint> callbackEndpoints,
String transferId, TransferState transferState) {
@@ -340,103 +85,14 @@ public class MFTAgent implements CommandLineRunner {
}).start();
}
- private boolean connectAgent() throws MFTConsulClientException {
- final ImmutableSession session = ImmutableSession.builder()
- .name(agentId)
- .behavior("delete")
- .ttl(sessionTTLSeconds + "s").build();
-
- final SessionCreatedResponse sessResp =
mftConsulClient.getSessionClient().createSession(session);
- final String lockPath = MFTConsulClient.LIVE_AGENTS_PATH + agentId;
-
- boolean acquired = mftConsulClient.getKvClient().acquireLock(lockPath,
sessResp.getId());
-
- if (acquired) {
- this.session = sessResp.getId();
- sessionRenewPool.scheduleAtFixedRate(() -> {
- try {
-
mftConsulClient.getSessionClient().renewSession(sessResp.getId());
- } catch (ConsulException e) {
- if (e.getCode() == 404) {
- logger.error("Can not renew session as it is expired");
- stop();
- }
- logger.warn("Errored while renewing the session", e);
- try {
- boolean status =
mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
- if (!status) {
- logger.error("Can not renew session as it is
expired");
- stop();
- }
- } catch (Exception ex) {
- logger.error("Can not renew session as it is expired");
- stop();
- }
- } catch (Exception e) {
- try {
- boolean status =
mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
- if (!status) {
- logger.error("Can not renew session as it is
expired");
- stop();
- }
- } catch (Exception ex) {
- logger.error("Can not renew session as it is expired");
- stop();
- }
- }
- }, sessionRenewSeconds, sessionRenewSeconds, TimeUnit.SECONDS);
-
- this.mftConsulClient.registerAgent(new AgentInfo()
- .setId(agentId)
- .setHost(agentHost)
- .setUser(agentUser)
- .setSessionId(this.session)
-
.setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
- .setLocalStorages(new ArrayList<>()));
- }
-
- logger.info("Acquired lock " + acquired);
- return acquired;
- }
-
- public void disconnectAgent() {
- sessionRenewPool.shutdown();
- if (transferCacheListener != null) {
- transferMessageCache.removeListener(transferCacheListener);
- }
-
- if (rpcCacheListener != null) {
- rpcMessageCache.removeListener(rpcCacheListener);
- }
- }
-
+ @PreDestroy
public void stop() {
logger.info("Stopping Agent " + agentId);
- disconnectAgent();
mainHold.release();
- transferRequestExecutor.shutdown();
}
public void start() throws Exception {
init();
- boolean connected = false;
- int connectionRetries = 0;
- while (!connected) {
- connected = connectAgent();
- if (connected) {
- logger.info("Successfully connected to consul with session id
{}", session);
- } else {
- logger.info("Retrying to connect to consul");
- Thread.sleep(5000);
- connectionRetries++;
- if (connectionRetries > 10) {
- throw new Exception("Failed to connect to the cluster");
- }
- }
- }
-
- acceptTransferRequests();
- acceptRPCRequests();
acceptHTTPRequests();
}
diff --git
a/agent/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
b/agent/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
new file mode 100644
index 0000000..81c7e66
--- /dev/null
+++
b/agent/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent;
+
+import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.api.service.TransferApiRequest;
+import org.apache.airavata.mft.core.FileResourceMetadata;
+import org.apache.airavata.mft.core.MetadataCollectorResolver;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import
org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveRequest;
+import
org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveResponse;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+public class TransferOrchestrator {
+
+ private static final Logger logger =
LoggerFactory.getLogger(TransferOrchestrator.class);
+
+ private final AtomicLong totalRunningTransfers = new AtomicLong(0);
+ private final AtomicLong totalPendingTransfers = new AtomicLong(0);
+
+
@org.springframework.beans.factory.annotation.Value("${agent.concurrent.transfers}")
+ private int concurrentTransfers;
+
+ private ExecutorService transferRequestExecutor;
+
+ private TransportMediator mediator;
+
+
@org.springframework.beans.factory.annotation.Value("${agent.concurrent.chunked.threads}")
+ private int concurrentChunkedThreads;
+
+ @org.springframework.beans.factory.annotation.Value("${agent.chunk.size}")
+ private int chunkedSize;
+
+
@org.springframework.beans.factory.annotation.Value("${agent.chunk.streaming.enabled}")
+ private boolean doChunkStream;
+
+
@org.springframework.beans.factory.annotation.Value("${agent.temp.data.dir}")
+ private String tempDataDir = "/tmp";
+
+
@org.springframework.beans.factory.annotation.Value("${resource.service.host}")
+ private String resourceServiceHost;
+
+
@org.springframework.beans.factory.annotation.Value("${resource.service.port}")
+ private int resourceServicePort;
+
+
@org.springframework.beans.factory.annotation.Value("${secret.service.host}")
+ private String secretServiceHost;
+
+
@org.springframework.beans.factory.annotation.Value("${secret.service.port}")
+ private int secretServicePort;
+
+ @Autowired
+ private StorageServiceClient storageServiceClient;
+
+ @PostConstruct
+ public void init() {
+ transferRequestExecutor =
Executors.newFixedThreadPool(concurrentTransfers);
+ mediator = new TransportMediator(tempDataDir,
+ concurrentTransfers,
+ concurrentChunkedThreads,
+ chunkedSize, doChunkStream);
+ logger.info("Transfer orchestrator initialized");
+ }
+
+ @PreDestroy
+ public void destroy() {
+ transferRequestExecutor.shutdown();
+ logger.info("Transfer orchestrator turned off");
+ }
+
+ public void submitTransferToProcess(String transferId, TransferApiRequest
request,
+ BiConsumer<String, TransferState>
updateStatus,
+ Consumer<Boolean> createTransferHook) {
+ long totalPending = totalPendingTransfers.incrementAndGet();
+ logger.info("Total pending transfers {}", totalPending);
+ transferRequestExecutor.submit(() -> processTransfer(transferId,
request, updateStatus, createTransferHook));
+ }
+
+ public void processTransfer(String transferId, TransferApiRequest request,
+ BiConsumer<String, TransferState>
updateStatus, Consumer<Boolean> createTransferHook) {
+ try {
+
+ long running = totalRunningTransfers.incrementAndGet();
+ long pending = totalPendingTransfers.decrementAndGet();
+ logger.info("Received request {}. Total Running {}. Total Pending
{}", transferId, running, pending);
+
+ updateStatus.accept(transferId, new TransferState()
+ .setState("STARTING")
+ .setPercentage(0)
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Starting the transfer"));
+
+ StorageTypeResolveResponse sourceStorageType =
storageServiceClient.common()
+ .resolveStorageType(StorageTypeResolveRequest.newBuilder()
+
.setStorageId(request.getSourceStorageId()).build());
+ Optional<MetadataCollector> srcMetadataCollectorOp =
MetadataCollectorResolver
+
.resolveMetadataCollector(sourceStorageType.getStorageType());
+ MetadataCollector srcMetadataCollector =
srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a
metadata collector for source"));
+ srcMetadataCollector.init(resourceServiceHost,
resourceServicePort, secretServiceHost, secretServicePort);
+
+ StorageTypeResolveResponse destStorageType =
storageServiceClient.common()
+ .resolveStorageType(StorageTypeResolveRequest.newBuilder()
+
.setStorageId(request.getSourceStorageId()).build());
+
+ FileResourceMetadata srcMetadata =
srcMetadataCollector.getFileResourceMetadata(
+ request.getMftAuthorizationToken(),
+ request.getSourcePath(),
+ request.getSourceStorageId(),
+ request.getSourceToken());
+
+
+ ConnectorConfig srcCC =
ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+ .withAuthToken(request.getMftAuthorizationToken())
+ .withResourceServiceHost(resourceServiceHost)
+ .withResourceServicePort(resourceServicePort)
+ .withSecretServiceHost(secretServiceHost)
+ .withSecretServicePort(secretServicePort)
+ .withTransferId(transferId)
+ .withStorageId(request.getSourceStorageId())
+ .withResourcePath(request.getSourcePath())
+ .withStorageType(sourceStorageType.getStorageType())
+ .withCredentialToken(request.getSourceToken())
+ .withMetadata(srcMetadata).build();
+
+ ConnectorConfig dstCC =
ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+ .withAuthToken(request.getMftAuthorizationToken())
+ .withResourceServiceHost(resourceServiceHost)
+ .withResourceServicePort(resourceServicePort)
+ .withSecretServiceHost(secretServiceHost)
+ .withSecretServicePort(secretServicePort)
+ .withTransferId(transferId)
+ .withStorageId(request.getDestinationStorageId())
+ .withResourcePath(request.getDestinationPath())
+ .withStorageType(destStorageType.getStorageType())
+ .withCredentialToken(request.getDestinationToken())
+ .withMetadata(srcMetadata).build();
+
+ updateStatus.accept(transferId, new TransferState()
+ .setState("STARTED")
+ .setPercentage(0)
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Started the transfer"));
+
+ // Save transfer metadata in scheduled path to recover in case of
an Agent failures. Recovery is done from controller
+ createTransferHook.accept(true);
+
+ mediator.transferSingleThread(transferId, request, srcCC, dstCC,
updateStatus,
+ (id, transferSuccess) -> {
+ try {
+ // Delete scheduled key as the transfer completed
/ failed if it was placed in current session
+ createTransferHook.accept(false);
+ long pendingAfter =
totalRunningTransfers.decrementAndGet();
+ logger.info("Removed transfer {} from queue with
transfer success = {}. Total running {}",
+ id, transferSuccess, pendingAfter);
+ } catch (Exception e) {
+ logger.error("Failed while deleting scheduled path
for transfer {}", id);
+ }
+ });
+
+
+ } catch (Throwable e) {
+ if (request != null) {
+ logger.error("Error in submitting transfer {}", transferId, e);
+
+ updateStatus.accept(transferId, new TransferState()
+ .setState("FAILED")
+ .setPercentage(0)
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription(ExceptionUtils.getStackTrace(e)));
+
+ } else {
+ logger.error("Unknown error in processing message {}",
request.toString(), e);
+ }
+ } finally {
+ //logger.info("Deleting key " + consulEntryKey);
+ //mftConsulClient.getKvClient().deleteKey(consulEntryKey); // Due
to bug in consul https://github.com/hashicorp/consul/issues/571
+ }
+ }
+
+}
diff --git
a/agent/src/main/java/org/apache/airavata/mft/agent/ingress/ConsulIngressHandler.java
b/agent/src/main/java/org/apache/airavata/mft/agent/ingress/ConsulIngressHandler.java
new file mode 100644
index 0000000..f3ffdd2
--- /dev/null
+++
b/agent/src/main/java/org/apache/airavata/mft/agent/ingress/ConsulIngressHandler.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.ingress;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.util.JsonFormat;
+import com.orbitz.consul.ConsulException;
+import com.orbitz.consul.cache.ConsulCache;
+import com.orbitz.consul.cache.KVCache;
+import com.orbitz.consul.model.kv.Value;
+import com.orbitz.consul.model.session.ImmutableSession;
+import com.orbitz.consul.model.session.SessionCreatedResponse;
+import org.apache.airavata.mft.admin.MFTConsulClient;
+import org.apache.airavata.mft.admin.MFTConsulClientException;
+import org.apache.airavata.mft.admin.models.AgentInfo;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
+import org.apache.airavata.mft.agent.AgentUtil;
+import org.apache.airavata.mft.agent.TransferOrchestrator;
+import org.apache.airavata.mft.agent.rpc.RPCParser;
+import org.apache.airavata.mft.api.service.TransferApiRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class ConsulIngressHandler {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ConsulIngressHandler.class);
+
+ private KVCache transferMessageCache;
+ private KVCache rpcMessageCache;
+
+ private ConsulCache.Listener<String, Value> transferCacheListener;
+ private ConsulCache.Listener<String, Value> rpcCacheListener;
+ @Autowired
+ private MFTConsulClient mftConsulClient;
+ @Autowired
+ private RPCParser rpcParser;
+
+ private String session;
+
+ private final ScheduledExecutorService sessionRenewPool =
Executors.newSingleThreadScheduledExecutor();
+
+ private long sessionRenewSeconds = 4;
+ private long sessionTTLSeconds = 10;
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ @org.springframework.beans.factory.annotation.Value("${agent.id}")
+ private String agentId;
+
+ @org.springframework.beans.factory.annotation.Value("${agent.secret}")
+ private String agentSecret;
+
+ @org.springframework.beans.factory.annotation.Value("${agent.host}")
+ private String agentHost;
+
+ @org.springframework.beans.factory.annotation.Value("${agent.user}")
+ private String agentUser;
+
+
@org.springframework.beans.factory.annotation.Value("${agent.supported.protocols}")
+ private String supportedProtocols;
+
+ @Autowired
+ private TransferOrchestrator transferOrchestrator;
+
+ private void acceptTransferRequests() {
+
+ transferCacheListener = newValues -> {
+ newValues.values().forEach(value -> {
+ Optional<String> decodedValue = value.getValueAsString();
+ String transferId =
value.getKey().substring(value.getKey().lastIndexOf("/") + 1);
+ decodedValue.ifPresent(reqJson -> {
+ mftConsulClient.getKvClient().deleteKey(value.getKey());
+ TransferApiRequest.Builder builder =
TransferApiRequest.newBuilder();
+ try {
+ JsonFormat.parser().merge(reqJson, builder);
+
+ } catch (Exception e) {
+ logger.error("Failed to parse json string {} into a
transfer request", reqJson);
+ return;
+ }
+ TransferApiRequest request = builder.build();
+ transferOrchestrator.submitTransferToProcess(transferId,
request,
+ AgentUtil.throwingBiConsumerWrapper((id, st) -> {
+
mftConsulClient.submitTransferStateToProcess(id, agentId,
st.setPublisher(agentId));
+ }),
+ AgentUtil.throwingConsumerWrapper(create -> {
+ if (create) {
+
mftConsulClient.getKvClient().putValue(MFTConsulClient.AGENTS_SCHEDULED_PATH +
agentId + "/" + session + "/" + transferId, reqJson);
+ } else {
+
mftConsulClient.getKvClient().deleteKey(MFTConsulClient.AGENTS_SCHEDULED_PATH +
agentId + "/" + session + "/" + transferId);
+ }
+ }));
+ });
+ });
+ };
+ transferMessageCache.addListener(transferCacheListener);
+ transferMessageCache.start();
+ }
+
+ private void acceptRPCRequests() {
+ rpcCacheListener = newValues -> {
+ newValues.values().forEach(value -> {
+ Optional<String> decodedValue = value.getValueAsString();
+ decodedValue.ifPresent(v -> {
+ try {
+ SyncRPCRequest rpcRequest = mapper.readValue(v,
SyncRPCRequest.class);
+ SyncRPCResponse syncRPCResponse =
rpcParser.processRPCRequest(rpcRequest);
+
mftConsulClient.sendSyncRPCResponseFromAgent(rpcRequest.getReturnAddress(),
syncRPCResponse);
+ } catch (Throwable e) {
+ logger.error("Error processing the RPC request {}",
value.getKey(), e);
+ } finally {
+
mftConsulClient.getKvClient().deleteKey(value.getKey());
+ }
+ });
+ });
+ };
+
+ rpcMessageCache.addListener(rpcCacheListener);
+ rpcMessageCache.start();
+ }
+
+ private boolean connectAgent() throws MFTConsulClientException {
+ final ImmutableSession session = ImmutableSession.builder()
+ .name(agentId)
+ .behavior("delete")
+ .ttl(sessionTTLSeconds + "s").build();
+
+ final SessionCreatedResponse sessResp =
mftConsulClient.getSessionClient().createSession(session);
+ final String lockPath = MFTConsulClient.LIVE_AGENTS_PATH + agentId;
+
+ boolean acquired = mftConsulClient.getKvClient().acquireLock(lockPath,
sessResp.getId());
+
+ if (acquired) {
+ this.session = sessResp.getId();
+ sessionRenewPool.scheduleAtFixedRate(() -> {
+ try {
+
mftConsulClient.getSessionClient().renewSession(sessResp.getId());
+ } catch (ConsulException e) {
+ if (e.getCode() == 404) {
+ logger.error("Can not renew session as it is expired");
+ destroy();
+ }
+ logger.warn("Errored while renewing the session", e);
+ try {
+ boolean status =
mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
+ if (!status) {
+ logger.error("Can not renew session as it is
expired");
+ destroy();
+ }
+ } catch (Exception ex) {
+ logger.error("Can not renew session as it is expired");
+ destroy();
+ }
+ } catch (Exception e) {
+ try {
+ boolean status =
mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
+ if (!status) {
+ logger.error("Can not renew session as it is
expired");
+ destroy();
+ }
+ } catch (Exception ex) {
+ logger.error("Can not renew session as it is expired");
+ destroy();
+ }
+ }
+ }, sessionRenewSeconds, sessionRenewSeconds, TimeUnit.SECONDS);
+
+ this.mftConsulClient.registerAgent(new AgentInfo()
+ .setId(agentId)
+ .setHost(agentHost)
+ .setUser(agentUser)
+ .setSessionId(this.session)
+
.setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
+ .setLocalStorages(new ArrayList<>()));
+ }
+
+ logger.info("Acquired lock " + acquired);
+ return acquired;
+ }
+
+ @PostConstruct
+ public void init() throws Exception {
+ transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(),
MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
+ rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(),
MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
+
+ boolean connected = false;
+ int connectionRetries = 0;
+ while (!connected) {
+ connected = connectAgent();
+ if (connected) {
+ logger.info("Successfully connected to consul with session id
{}", session);
+ } else {
+ logger.info("Retrying to connect to consul");
+ Thread.sleep(5000);
+ connectionRetries++;
+ if (connectionRetries > 10) {
+ throw new Exception("Failed to connect to the cluster");
+ }
+ }
+ }
+
+ acceptTransferRequests();
+ acceptRPCRequests();
+ logger.info("Consul ingress handler initialized");
+
+ }
+
+ @PreDestroy
+ public void destroy() {
+ if (!sessionRenewPool.isShutdown())
+ sessionRenewPool.shutdown();
+ if (transferCacheListener != null) {
+ transferMessageCache.removeListener(transferCacheListener);
+ }
+
+ if (rpcCacheListener != null) {
+ rpcMessageCache.removeListener(rpcCacheListener);
+ }
+ logger.info("Consul ingress handler turned off");
+ }
+
+}