This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new 319c424 Support to initiate transfers through CLI
319c424 is described below
commit 319c424224a8fbeffab2dad543ad493fa2e039af
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Sun Dec 25 06:52:28 2022 -0500
Support to initiate transfers through CLI
---
.../airavata/mft/api/handler/MFTApiHandler.java | 2 +-
api/stub/src/main/proto/MFTTransferApi.proto | 8 +-
.../sub/transfer/SubmitTransferSubCommand.java | 4 +-
.../mft/admin/ControllerRequestBuilder.java | 4 +-
python-cli/README.md | 2 +-
python-cli/mft_cli/mft_cli/main.py | 133 +++++++++++++++++++--
python-sdk/setup.cfg | 2 +-
.../src/airavata_mft_sdk/MFTTransferApi_pb2.py | 60 +++++-----
8 files changed, 164 insertions(+), 51 deletions(-)
diff --git
a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
index 1ae2139..a786b6d 100644
---
a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
+++
b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
@@ -213,7 +213,7 @@ public class MFTApiHandler extends
MFTTransferServiceGrpc.MFTTransferServiceImpl
.withMethod("submitHttpDownload")
.withParameter("resourcePath", request.getResourcePath())
.withParameter("sourceStorageId",
request.getSourceStorageId())
- .withParameter("sourceToken", request.getSourceToken())
+ .withParameter("sourceToken", request.getSourceSecretId())
.withParameter("mftAuthorizationToken",
JsonFormat.printer().print(request.getMftAuthorizationToken()));
SyncRPCResponse rpcResponse =
agentRPCClient.sendSyncRequest(requestBuilder.build());
diff --git a/api/stub/src/main/proto/MFTTransferApi.proto
b/api/stub/src/main/proto/MFTTransferApi.proto
index ca21459..813a203 100644
--- a/api/stub/src/main/proto/MFTTransferApi.proto
+++ b/api/stub/src/main/proto/MFTTransferApi.proto
@@ -18,10 +18,10 @@ message CallbackEndpoint {
message TransferApiRequest {
string sourcePath = 1;
string sourceStorageId = 2;
- string sourceToken = 3;
+ string sourceSecretId = 3;
string destinationPath = 4;
string destinationStorageId = 5;
- string destinationToken = 6;
+ string destinationSecretId = 6;
bool affinityTransfer = 7;
map<string, int32> targetAgents = 8;
org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 9;
@@ -43,7 +43,7 @@ message BatchTransferApiResponse {
message HttpUploadApiRequest {
string destinationStorageId = 1;
string resourcePath = 2;
- string destinationToken = 3;
+ string destinationSecretId = 3;
string targetAgent = 5;
org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 6;
}
@@ -56,7 +56,7 @@ message HttpUploadApiResponse {
message HttpDownloadApiRequest {
string resourcePath = 1;
string sourceStorageId = 2;
- string sourceToken = 3;
+ string sourceSecretId = 3;
string targetAgent = 5;
org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 6;
}
diff --git
a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/transfer/SubmitTransferSubCommand.java
b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/transfer/SubmitTransferSubCommand.java
index a873b1b..2787f62 100644
---
a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/transfer/SubmitTransferSubCommand.java
+++
b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/transfer/SubmitTransferSubCommand.java
@@ -52,8 +52,8 @@ public class SubmitTransferSubCommand implements
Callable<Integer> {
}
TransferApiResponse transferResp =
mftApiClient.getTransferClient().submitTransfer(TransferApiRequest.newBuilder()
- .setSourceToken(sourceSecretForStorage.getSecretId())
- .setDestinationToken(destSecretForStorage.getSecretId())
+ .setSourceSecretId(sourceSecretForStorage.getSecretId())
+ .setDestinationSecretId(destSecretForStorage.getSecretId())
.setDestinationStorageId(destinationStorageId)
.setDestinationPath(destinationPath)
.setSourceStorageId(sourceStorageId)
diff --git
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/ControllerRequestBuilder.java
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/ControllerRequestBuilder.java
index 86e4830..d879a0b 100644
---
a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/ControllerRequestBuilder.java
+++
b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/ControllerRequestBuilder.java
@@ -69,13 +69,13 @@ public class ControllerRequestBuilder {
agentTransferBuilder.setSourcePath(transferRequest.getSourcePath());
agentTransferBuilder.setDestinationPath(transferRequest.getDestinationPath());
Pair<StorageWrapper, SecretWrapper> sourceCred =
createCredentials(transferRequest.getSourceStorageId(),
- transferRequest.getSourceToken());
+ transferRequest.getSourceSecretId());
agentTransferBuilder.setSourceStorage(sourceCred.getLeft());
agentTransferBuilder.setSourceSecret(sourceCred.getRight());
Pair<StorageWrapper, SecretWrapper> destCred =
createCredentials(transferRequest.getDestinationStorageId(),
- transferRequest.getDestinationToken());
+ transferRequest.getDestinationSecretId());
agentTransferBuilder.setDestinationStorage(destCred.getLeft());
agentTransferBuilder.setDestinationSecret(destCred.getRight());
diff --git a/python-cli/README.md b/python-cli/README.md
index 2474765..191450d 100644
--- a/python-cli/README.md
+++ b/python-cli/README.md
@@ -18,7 +18,7 @@ Install dependencies
```
pip install grpcio==1.46.3
pip install grpcio-tools==1.46.3
-pip install airavata_mft_sdk==0.0.1-alpha18
+pip install airavata_mft_sdk==0.0.1-alpha19
```
Build the binary
diff --git a/python-cli/mft_cli/mft_cli/main.py
b/python-cli/mft_cli/mft_cli/main.py
index db1109d..2c2fccd 100644
--- a/python-cli/mft_cli/mft_cli/main.py
+++ b/python-cli/mft_cli/mft_cli/main.py
@@ -5,15 +5,14 @@ from airavata_mft_sdk.common import StorageCommon_pb2
from airavata_mft_sdk import MFTTransferApi_pb2
from rich.console import Console
from rich.table import Table
+from rich.progress import track
+import time
app = typer.Typer()
app.add_typer(mft_cli.storage.app, name="storage")
[email protected]("ls")
-def list(storage_path):
- storage_name = storage_path.split("/")[0]
- resource_path = storage_path[len(storage_name) +1 :]
+def fetch_storage_and_secret_ids(storage_name):
client = mft_client.MFTClient()
search_req =
StorageCommon_pb2.StorageSearchRequest(storageName=storage_name)
storages = client.common_api.searchStorages(search_req)
@@ -24,11 +23,11 @@ def list(storage_path):
if len(storages.storageList) == 0:
print("No storage with name or id " + storage_name + " was found.
Please register the storage with command mft-cli storage add")
- exit()
+ raise typer.Abort()
if len(storages.storageList) > 1:
print("More than one storage with nam " + storage_name + " was found.
Please use the storage id. You can fetch it from mft-cli storage list")
- exit()
+ raise typer.Abort()
storage = storages.storageList[0]
sec_req = StorageCommon_pb2.SecretForStorageGetRequest(storageId =
storage.storageId)
@@ -36,12 +35,26 @@ def list(storage_path):
if sec_resp.error != 0:
print("Could not fetch the secret for storage " + storage.storageId)
- id_req = MFTTransferApi_pb2.GetResourceMetadataFromIDsRequest(storageId =
sec_resp.storageId,
-
secretId = sec_resp.secretId,
-
resourcePath = resource_path)
+ return sec_resp.storageId, sec_resp.secretId
+def get_resource_metadata(storage_path, recursive_search = False):
+ storage_name = storage_path.split("/")[0]
+ resource_path = storage_path[len(storage_name) +1 :]
+
+ storage_id, secret_id = fetch_storage_and_secret_ids(storage_name)
+
+ id_req = MFTTransferApi_pb2.GetResourceMetadataFromIDsRequest(storageId =
storage_id,
+ secretId =
secret_id,
+ resourcePath
= resource_path)
resource_medata_req =
MFTTransferApi_pb2.FetchResourceMetadataRequest(idRequest = id_req)
+ client = mft_client.MFTClient()
+
metadata_resp = client.transfer_api.resourceMetadata(resource_medata_req)
+ return metadata_resp
[email protected]("ls")
+def list(storage_path):
+
+ metadata_resp = get_resource_metadata(storage_path)
console = Console()
table = Table()
@@ -65,10 +78,110 @@ def list(storage_path):
console.print(table)
+def flatten_directories(directory, parent_path, file_list):
+ for dir in directory.directories:
+ flatten_directories(dir, parent_path + dir.friendlyName + "/",
file_list)
+
+ for file in directory.files:
+ file_list.append((file, parent_path + file.friendlyName))
+
@app.command("cp")
def copy(source, destination):
- print("Moving data from " + source + " to " + destination)
+ source_storage_id, source_secret_id =
fetch_storage_and_secret_ids(source.split("/")[0])
+ dest_storage_id, dest_secret_id =
fetch_storage_and_secret_ids(destination.split("/")[0])
+
+ ## TODO : Check agent availability and deploy cloud agents if required
+
+ file_list = []
+ source_metadata = get_resource_metadata(source)
+ transfer_requests = []
+ total_volume = 0
+
+ if (source_metadata.WhichOneof('metadata') == 'directory') :
+ if (destination[-1] != "/"):
+ print("Source is a directory path so destination path should end
with /")
+ raise typer.Abort()
+
+ flatten_directories(source_metadata.directory, "", file_list)
+ for file_entry in file_list:
+ file = file_entry[0]
+ relative_path = file_entry[1]
+ transfer_requests.append(MFTTransferApi_pb2.TransferApiRequest(
+ sourcePath = file.resourcePath,
+ sourceStorageId = source_storage_id,
+ sourceSecretId = source_secret_id,
+ destinationPath = destination[len(destination.split("/")[0])
+1 :] + relative_path,
+ destinationStorageId = dest_storage_id,
+ destinationSecretId = dest_secret_id))
+ total_volume += file.resourceSize
+
+ elif (source_metadata.WhichOneof('metadata') == 'file'):
+ file_list.append((source_metadata.file,
source_metadata.file.friendlyName))
+
+ if destination[-1] == "/":
+ destination = destination + source_metadata.file.friendlyName
+
+ transfer_requests.append(MFTTransferApi_pb2.TransferApiRequest(
+ sourcePath = source_metadata.file.resourcePath,
+ sourceStorageId = source_storage_id,
+ sourceSecretId = source_secret_id,
+ destinationPath = destination[len(destination.split("/")[0]) +1 :],
+ destinationStorageId = dest_storage_id,
+ destinationSecretId = dest_secret_id))
+
+ total_volume += source_metadata.file.resourceSize
+
+ elif (source_metadata.WhichOneof('metadata') == 'error'):
+ print("Failed while fetching source details")
+ print(metadata_resp.error)
+ raise typer.Abort()
+
+ batch_transfer_request = MFTTransferApi_pb2.BatchTransferApiRequest()
+ batch_transfer_request.transferRequests.extend(transfer_requests)
+
+ confirm = typer.confirm("Total number of " + str(len(transfer_requests)) +
+ " files to be transferred. Total volume is " +
str(total_volume)
+ + " bytes. Do you want to start the transfer? ", True)
+
+ client = mft_client.MFTClient()
+ batch_transfer_resp =
client.transfer_api.submitBatchTransfer(batch_transfer_request)
+
+ if not confirm:
+ raise typer.Abort()
+
+ transfer_ids = batch_transfer_resp.transferIds
+
+ state_requests = []
+ for transfer_id in transfer_ids:
+
state_requests.append(MFTTransferApi_pb2.TransferStateApiRequest(transferId=transfer_id))
+
+ ## TODO: This has to be optimized and avoid frequent polling of all
transfer ids in each iteration
+ ## Possible fix is to introduce a parent batch transfer id at the API
level and fetch child trnasfer id
+ # summaries in a single API call
+
+ completed = 0
+ failed = 0
+
+ with typer.progressbar(length=len(transfer_ids)) as progress:
+
+ while 1:
+ completed = 0
+ failed = 0
+ for state_request in state_requests:
+ state_resp =
client.transfer_api.getTransferState(state_request)
+ if state_resp.state == "COMPLETED":
+ completed += 1
+ elif state_resp.state == "FAILED":
+ failed += 1
+
+ total = completed + failed
+ progress.update(total)
+ if (total == len(transfer_ids)):
+ break
+ time.sleep(1)
+
+ print(f"Processed {completed + failed} files. Completed {completed},
Failed {failed}.")
if __name__ == "__main__":
app()
\ No newline at end of file
diff --git a/python-sdk/setup.cfg b/python-sdk/setup.cfg
index 0a18af5..7843b72 100644
--- a/python-sdk/setup.cfg
+++ b/python-sdk/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = airavata_mft_sdk
-version = 0.0.1-alpha18
+version = 0.0.1-alpha19
author = Airavata MFT Developers
author_email = [email protected]
description = Python SDK for Apache Airavata Managed File Transfers (MFT)
diff --git a/python-sdk/src/airavata_mft_sdk/MFTTransferApi_pb2.py
b/python-sdk/src/airavata_mft_sdk/MFTTransferApi_pb2.py
index ccfb3ff..623d5dd 100644
--- a/python-sdk/src/airavata_mft_sdk/MFTTransferApi_pb2.py
+++ b/python-sdk/src/airavata_mft_sdk/MFTTransferApi_pb2.py
@@ -16,7 +16,7 @@ import airavata_mft_sdk.CredCommon_pb2 as CredCommon__pb2
import airavata_mft_sdk.MFTAgentStubs_pb2 as MFTAgentStubs__pb2
-DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\x14MFTTransferApi.proto\x12#org.apache.airavata.mft.api.service\x1a\x10\x43redCommon.proto\x1a\x13MFTAgentStubs.proto\"\x9b\x01\n\x10\x43\x61llbackEndpoint\x12P\n\x04type\x18\x01
\x01(\x0e\x32\x42.org.apache.airavata.mft.api.service.CallbackEndpoint.CallbackType\x12\x10\n\x08\x65ndpoint\x18\x02
\x01(\t\"#\n\x0c\x43\x61llbackType\x12\x08\n\x04HTTP\x10\x00\x12\t\n\x05KAFKA\x10\x01\"\xf3\x03\n\x12TransferApiRequest\x12\x12\n\nso
[...]
+DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\x14MFTTransferApi.proto\x12#org.apache.airavata.mft.api.service\x1a\x10\x43redCommon.proto\x1a\x13MFTAgentStubs.proto\"\x9b\x01\n\x10\x43\x61llbackEndpoint\x12P\n\x04type\x18\x01
\x01(\x0e\x32\x42.org.apache.airavata.mft.api.service.CallbackEndpoint.CallbackType\x12\x10\n\x08\x65ndpoint\x18\x02
\x01(\t\"#\n\x0c\x43\x61llbackType\x12\x08\n\x04HTTP\x10\x00\x12\t\n\x05KAFKA\x10\x01\"\xf9\x03\n\x12TransferApiRequest\x12\x12\n\nso
[...]
@@ -154,33 +154,33 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_CALLBACKENDPOINT_CALLBACKTYPE._serialized_start=221
_CALLBACKENDPOINT_CALLBACKTYPE._serialized_end=256
_TRANSFERAPIREQUEST._serialized_start=259
- _TRANSFERAPIREQUEST._serialized_end=758
- _TRANSFERAPIREQUEST_TARGETAGENTSENTRY._serialized_start=707
- _TRANSFERAPIREQUEST_TARGETAGENTSENTRY._serialized_end=758
- _TRANSFERAPIRESPONSE._serialized_start=760
- _TRANSFERAPIRESPONSE._serialized_end=801
- _BATCHTRANSFERAPIREQUEST._serialized_start=803
- _BATCHTRANSFERAPIREQUEST._serialized_end=911
- _BATCHTRANSFERAPIRESPONSE._serialized_start=913
- _BATCHTRANSFERAPIRESPONSE._serialized_end=960
- _HTTPUPLOADAPIREQUEST._serialized_start=963
- _HTTPUPLOADAPIREQUEST._serialized_end=1158
- _HTTPUPLOADAPIRESPONSE._serialized_start=1160
- _HTTPUPLOADAPIRESPONSE._serialized_end=1217
- _HTTPDOWNLOADAPIREQUEST._serialized_start=1220
- _HTTPDOWNLOADAPIREQUEST._serialized_end=1407
- _HTTPDOWNLOADAPIRESPONSE._serialized_start=1409
- _HTTPDOWNLOADAPIRESPONSE._serialized_end=1468
- _TRANSFERSTATEAPIREQUEST._serialized_start=1470
- _TRANSFERSTATEAPIREQUEST._serialized_end=1589
- _TRANSFERSTATEAPIRESPONSE._serialized_start=1591
- _TRANSFERSTATEAPIRESPONSE._serialized_end=1697
- _RESOURCEAVAILABILITYRESPONSE._serialized_start=1699
- _RESOURCEAVAILABILITYRESPONSE._serialized_end=1748
- _GETRESOURCEMETADATAFROMIDSREQUEST._serialized_start=1750
- _GETRESOURCEMETADATAFROMIDSREQUEST._serialized_end=1869
- _FETCHRESOURCEMETADATAREQUEST._serialized_start=1872
- _FETCHRESOURCEMETADATAREQUEST._serialized_end=2169
- _MFTTRANSFERSERVICE._serialized_start=2172
- _MFTTRANSFERSERVICE._serialized_end=3360
+ _TRANSFERAPIREQUEST._serialized_end=764
+ _TRANSFERAPIREQUEST_TARGETAGENTSENTRY._serialized_start=713
+ _TRANSFERAPIREQUEST_TARGETAGENTSENTRY._serialized_end=764
+ _TRANSFERAPIRESPONSE._serialized_start=766
+ _TRANSFERAPIRESPONSE._serialized_end=807
+ _BATCHTRANSFERAPIREQUEST._serialized_start=809
+ _BATCHTRANSFERAPIREQUEST._serialized_end=917
+ _BATCHTRANSFERAPIRESPONSE._serialized_start=919
+ _BATCHTRANSFERAPIRESPONSE._serialized_end=966
+ _HTTPUPLOADAPIREQUEST._serialized_start=969
+ _HTTPUPLOADAPIREQUEST._serialized_end=1167
+ _HTTPUPLOADAPIRESPONSE._serialized_start=1169
+ _HTTPUPLOADAPIRESPONSE._serialized_end=1226
+ _HTTPDOWNLOADAPIREQUEST._serialized_start=1229
+ _HTTPDOWNLOADAPIREQUEST._serialized_end=1419
+ _HTTPDOWNLOADAPIRESPONSE._serialized_start=1421
+ _HTTPDOWNLOADAPIRESPONSE._serialized_end=1480
+ _TRANSFERSTATEAPIREQUEST._serialized_start=1482
+ _TRANSFERSTATEAPIREQUEST._serialized_end=1601
+ _TRANSFERSTATEAPIRESPONSE._serialized_start=1603
+ _TRANSFERSTATEAPIRESPONSE._serialized_end=1709
+ _RESOURCEAVAILABILITYRESPONSE._serialized_start=1711
+ _RESOURCEAVAILABILITYRESPONSE._serialized_end=1760
+ _GETRESOURCEMETADATAFROMIDSREQUEST._serialized_start=1762
+ _GETRESOURCEMETADATAFROMIDSREQUEST._serialized_end=1881
+ _FETCHRESOURCEMETADATAREQUEST._serialized_start=1884
+ _FETCHRESOURCEMETADATAREQUEST._serialized_end=2181
+ _MFTTRANSFERSERVICE._serialized_start=2184
+ _MFTTRANSFERSERVICE._serialized_end=3372
# @@protoc_insertion_point(module_scope)