This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
commit a57b554e5c5c7b419655fbbc6e8d21dc29694247 Author: Gopi Kiran <[email protected]> AuthorDate: Wed Apr 29 16:13:34 2020 -0400 FTP transport implementation --- transport/ftp-transport/pom.xml | 49 ++++++++++ .../mft/transport/ftp/FTPMetadataCollector.java | 106 +++++++++++++++++++++ .../airavata/mft/transport/ftp/FTPReceiver.java | 89 +++++++++++++++++ .../airavata/mft/transport/ftp/FTPSender.java | 91 ++++++++++++++++++ .../mft/transport/ftp/FTPTransportUtil.java | 35 +++++++ 5 files changed, 370 insertions(+) diff --git a/transport/ftp-transport/pom.xml b/transport/ftp-transport/pom.xml new file mode 100644 index 0000000..347cbc0 --- /dev/null +++ b/transport/ftp-transport/pom.xml @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>mft-transport</artifactId> + <groupId>org.apache.airavata</groupId> + <version>0.01-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>mft-ftp-transport</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>mft-core</artifactId> + <version>0.01-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>commons-net</groupId> + <artifactId>commons-net</artifactId> + <version>3.6</version> + </dependency> + + </dependencies> + +</project> \ No newline at end of file diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java new file mode 100644 index 0000000..8b7c9df --- /dev/null +++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java @@ -0,0 +1,106 @@ +package org.apache.airavata.mft.transport.ftp; + +import org.apache.airavata.mft.core.ResourceMetadata; +import org.apache.airavata.mft.core.api.MetadataCollector; +import org.apache.airavata.mft.resource.client.ResourceServiceClient; +import org.apache.airavata.mft.resource.service.FTPResource; +import org.apache.airavata.mft.resource.service.FTPResourceGetRequest; +import org.apache.airavata.mft.resource.service.ResourceServiceGrpc; +import org.apache.airavata.mft.secret.client.SecretServiceClient; +import org.apache.airavata.mft.secret.service.FTPSecret; +import org.apache.airavata.mft.secret.service.FTPSecretGetRequest; +import org.apache.airavata.mft.secret.service.SecretServiceGrpc; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.commons.net.ftp.FTPReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; + +public class FTPMetadataCollector implements MetadataCollector { + + private static final Logger logger = LoggerFactory.getLogger(FTPMetadataCollector.class); + + private String resourceServiceHost; + private int resourceServicePort; + private String secretServiceHost; + private int secretServicePort; + private boolean initialized = false; + + @Override + public void init(String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) { + this.resourceServiceHost = resourceServiceHost; + this.resourceServicePort = resourceServicePort; + this.secretServiceHost = secretServiceHost; + this.secretServicePort = secretServicePort; + this.initialized = true; + } + + private void checkInitialized() { + if (!initialized) { + throw new IllegalStateException("FTP Metadata Collector is not initialized"); + } + } + + @Override + public ResourceMetadata getGetResourceMetadata(String resourceId, String credentialToken) { + + checkInitialized(); + ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort); + FTPResource ftpResource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build()); + SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort); + FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build()); + + ResourceMetadata resourceMetadata = new ResourceMetadata(); + FTPClient ftpClient = null; + try { + ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret); + logger.info("Fetching metadata for resource {} in {}", ftpResource.getResourcePath(), ftpResource.getFtpStorage().getHost()); + + FTPFile ftpFile = ftpClient.mlistFile(ftpResource.getResourcePath()); + + if (ftpFile != null) { + resourceMetadata.setResourceSize(ftpFile.getSize()); + resourceMetadata.setUpdateTime(ftpFile.getTimestamp().getTimeInMillis()); + if (ftpClient.hasFeature("MD5") && FTPReply.isPositiveCompletion(ftpClient.sendCommand("MD5 " + ftpResource.getResourcePath()))) { + String[] replies = ftpClient.getReplyStrings(); + resourceMetadata.setMd5sum(replies[0]); + } else { + logger.warn("MD5 fetch error out {}", ftpClient.getReplyString()); + } + } + } catch (Exception e) { + logger.warn("Failed to fetch md5 for FTP resource {}", resourceId, e); + } finally { + FTPTransportUtil.disconnectFTP(ftpClient); + } + + return resourceMetadata; + } + + @Override + public Boolean isAvailable(String resourceId, String credentialToken) { + + checkInitialized(); + + ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort); + FTPResource ftpResource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build()); + SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort); + FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build()); + + FTPClient ftpClient = null; + try { + ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret); + InputStream inputStream = ftpClient.retrieveFileStream(ftpResource.getResourcePath()); + + return !(inputStream == null || ftpClient.getReplyCode() == 550); + } catch (Exception e) { + logger.error("FTP client initialization failed ", e); + return false; + } finally { + FTPTransportUtil.disconnectFTP(ftpClient); + } + } +} + diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java new file mode 100644 index 0000000..46909dd --- /dev/null +++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java @@ -0,0 +1,89 @@ +package org.apache.airavata.mft.transport.ftp; + +import org.apache.airavata.mft.core.ConnectorContext; +import org.apache.airavata.mft.core.api.Connector; +import org.apache.airavata.mft.resource.client.ResourceServiceClient; +import org.apache.airavata.mft.resource.service.FTPResource; +import org.apache.airavata.mft.resource.service.FTPResourceGetRequest; +import org.apache.airavata.mft.resource.service.ResourceServiceGrpc; +import org.apache.airavata.mft.secret.client.SecretServiceClient; +import org.apache.airavata.mft.secret.service.FTPSecret; +import org.apache.airavata.mft.secret.service.FTPSecretGetRequest; +import org.apache.airavata.mft.secret.service.SecretServiceGrpc; +import org.apache.commons.net.ftp.FTPClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.io.OutputStream; + +public class FTPReceiver implements Connector { + + private static final Logger logger = LoggerFactory.getLogger(FTPReceiver.class); + + private FTPResource resource; + private boolean initialized; + private FTPClient ftpClient; + + @Override + public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception { + this.initialized = true; + + ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort); + this.resource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build()); + + SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort); + FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build()); + + this.ftpClient = FTPTransportUtil.getFTPClient(this.resource, ftpSecret); + } + + @Override + public void destroy() { + FTPTransportUtil.disconnectFTP(ftpClient); + } + + @Override + public void startStream(ConnectorContext context) throws Exception { + logger.info("Starting FTP receiver stream for transfer {}", context.getTransferId()); + + checkInitialized(); + OutputStream streamOs = context.getStreamBuffer().getOutputStream(); + InputStream inputStream = ftpClient.retrieveFileStream(resource.getResourcePath()); + + long fileSize = context.getMetadata().getResourceSize(); + + byte[] buf = new byte[1024]; + while (true) { + int bufSize; + + if (buf.length < fileSize) { + bufSize = buf.length; + } else { + bufSize = (int) fileSize; + } + bufSize = inputStream.read(buf, 0, bufSize); + + if (bufSize < 0) { + break; + } + + streamOs.write(buf, 0, bufSize); + streamOs.flush(); + + fileSize -= bufSize; + if (fileSize == 0L) + break; + } + + inputStream.close(); + streamOs.close(); + logger.info("Completed FTP receiver stream for transfer {}", context.getTransferId()); + } + + private void checkInitialized() { + if (!initialized) { + throw new IllegalStateException("FTP Receiver is not initialized"); + } + } +} diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java new file mode 100644 index 0000000..41d4b8e --- /dev/null +++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java @@ -0,0 +1,91 @@ +package org.apache.airavata.mft.transport.ftp; + +import org.apache.airavata.mft.core.ConnectorContext; +import org.apache.airavata.mft.core.api.Connector; +import org.apache.airavata.mft.resource.client.ResourceServiceClient; +import org.apache.airavata.mft.resource.service.FTPResource; +import org.apache.airavata.mft.resource.service.FTPResourceGetRequest; +import org.apache.airavata.mft.resource.service.ResourceServiceGrpc; +import org.apache.airavata.mft.secret.client.SecretServiceClient; +import org.apache.airavata.mft.secret.service.FTPSecret; +import org.apache.airavata.mft.secret.service.FTPSecretGetRequest; +import org.apache.airavata.mft.secret.service.SecretServiceGrpc; +import org.apache.commons.net.ftp.FTPClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.io.OutputStream; + +public class FTPSender implements Connector { + + private static final Logger logger = LoggerFactory.getLogger(FTPReceiver.class); + + private FTPResource resource; + private boolean initialized; + private FTPClient ftpClient; + + @Override + public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception { + this.initialized = true; + + ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort); + this.resource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build()); + + SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort); + FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build()); + + this.ftpClient = FTPTransportUtil.getFTPClient(this.resource, ftpSecret); + } + + @Override + public void destroy() { + FTPTransportUtil.disconnectFTP(ftpClient); + } + + @Override + public void startStream(ConnectorContext context) throws Exception { + + logger.info("Starting FTP sender stream for transfer {}", context.getTransferId()); + + checkInitialized(); + InputStream in = context.getStreamBuffer().getInputStream(); + long fileSize = context.getMetadata().getResourceSize(); + OutputStream outputStream = ftpClient.storeFileStream(resource.getResourcePath()); + + byte[] buf = new byte[1024]; + while (true) { + int bufSize; + + if (buf.length < fileSize) { + bufSize = buf.length; + } else { + bufSize = (int) fileSize; + } + bufSize = in.read(buf, 0, bufSize); + + if (bufSize < 0) { + break; + } + + outputStream.write(buf, 0, bufSize); + outputStream.flush(); + + fileSize -= bufSize; + if (fileSize == 0L) + break; + } + + in.close(); + outputStream.close(); + + logger.info("Completed FTP sender stream for transfer {}", context.getTransferId()); + + } + + private void checkInitialized() { + if (!initialized) { + throw new IllegalStateException("FTP Sender is not initialized"); + } + } +} diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java new file mode 100644 index 0000000..81608a4 --- /dev/null +++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java @@ -0,0 +1,35 @@ +package org.apache.airavata.mft.transport.ftp; + +import org.apache.airavata.mft.resource.service.FTPResource; +import org.apache.airavata.mft.secret.service.FTPSecret; +import org.apache.commons.net.ftp.FTPClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +class FTPTransportUtil { + + private static final Logger logger = LoggerFactory.getLogger(FTPTransportUtil.class); + + static FTPClient getFTPClient(FTPResource ftpResource, FTPSecret ftpSecret) throws IOException { + + FTPClient ftpClient = new FTPClient(); + ftpClient.connect(ftpResource.getFtpStorage().getHost(), ftpResource.getFtpStorage().getPort()); + ftpClient.enterLocalActiveMode(); + ftpClient.login(ftpSecret.getUserId(), ftpSecret.getPassword()); + + return ftpClient; + } + + static void disconnectFTP(FTPClient ftpClient) { + try { + if (ftpClient != null) { + ftpClient.logout(); + ftpClient.disconnect(); + } + } catch (Exception e) { + logger.error("FTP client close operation failed", e); + } + } +}
