This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new 1a5958e Supporting bulk transfers at cli
1a5958e is described below
commit 1a5958ee5a193e3f84f776756e92db654a5a30d1
Author: DImuthuUpe <[email protected]>
AuthorDate: Wed Jul 19 15:50:31 2023 -0400
Supporting bulk transfers at cli
---
python-cli/mft_cli/airavata_mft_cli/base.py | 7 ++
python-cli/mft_cli/airavata_mft_cli/operations.py | 83 +++++++++++++++++++++-
.../mft_cli/airavata_mft_cli/storage/http.py | 8 +--
python-cli/mft_cli/pyproject.toml | 3 +-
4 files changed, 94 insertions(+), 7 deletions(-)
diff --git a/python-cli/mft_cli/airavata_mft_cli/base.py
b/python-cli/mft_cli/airavata_mft_cli/base.py
index 1d1e2d3..553747f 100644
--- a/python-cli/mft_cli/airavata_mft_cli/base.py
+++ b/python-cli/mft_cli/airavata_mft_cli/base.py
@@ -37,6 +37,13 @@ def copy(source, destination):
except Exception as e:
exception_handler(e)
[email protected]("cp-list")
+def copy_list(source_storage_id, dest_storage_id, list_file):
+ try:
+ operations.copy_list(source_storage_id, dest_storage_id, list_file)
+ except Exception as e:
+ exception_handler(e)
+
@app.command("init")
def init_mft():
bootstrap.start_mft()
diff --git a/python-cli/mft_cli/airavata_mft_cli/operations.py
b/python-cli/mft_cli/airavata_mft_cli/operations.py
index 40fbe64..8ed867d 100644
--- a/python-cli/mft_cli/airavata_mft_cli/operations.py
+++ b/python-cli/mft_cli/airavata_mft_cli/operations.py
@@ -26,6 +26,7 @@ import time
import sys
sys.path.append('.')
from . import config as configcli
+import pandas as pd
def fetch_storage_and_secret_ids(storage_name):
client = mft_client.MFTClient(transfer_api_port =
configcli.transfer_api_port,
@@ -193,13 +194,93 @@ def copy(source, destination):
completed = 0
failed = 0
+ progress_percentage = 0
with typer.progressbar(length=100) as progress:
while 1:
state_resp = client.transfer_api.getTransferStateSummary(state_request)
- progress.update(int(state_resp.percentage * 100))
+ progress_percentage = int(state_resp.percentage * 100)
+ progress.update(progress_percentage - prev_percentage)
+ prev_percentage = progress_percentage
+
+ if (state_resp.percentage == 1.0):
+ completed = len(state_resp.completed)
+ failed = len(state_resp.failed)
+ break
+
+ if (state_resp.state == "FAILED"):
+ print("Transfer failed. Reason: " + state_resp.description)
+ raise typer.Abort()
+ time.sleep(1)
+
+ print(f"Processed {completed + failed} files. Completed {completed}, Failed
{failed}.")
+
+def copy_list(source_storage_id, dest_storage_id, list_file):
+
+ source_storage_id, source_secret_id =
fetch_storage_and_secret_ids(source_storage_id)
+ dest_storage_id, dest_secret_id =
fetch_storage_and_secret_ids(dest_storage_id)
+
+ ## TODO : Check agent availability and deploy cloud agents if required
+
+ file_list = []
+ endpoint_paths = []
+ total_volume = 0
+
+ transfer_request = MFTTransferApi_pb2.TransferApiRequest(sourceStorageId =
source_storage_id,
+ sourceSecretId =
source_secret_id,
+
destinationStorageId = dest_storage_id,
+ destinationSecretId
= dest_secret_id,
+
optimizeTransferPath = False)
+ columns=['source', 'destination']
+ df = pd.read_csv(list_file, header=None, dtype=str, names=columns)
+ for i in range(len(df)):
+ source_path = df['source'][i]
+ destination_path = df['destination'][i]
+
+ endpoint_paths.append(MFTTransferApi_pb2.EndpointPaths(sourcePath =
source_path,
+ destinationPath = destination_path))
+
+ transfer_request.endpointPaths.extend(endpoint_paths)
+
+ confirm = typer.confirm("Total number of " + str(len(endpoint_paths)) +
+ " files to be transferred. Do you want to start the
transfer? ", True)
+
+ if not confirm:
+ raise typer.Abort()
+
+ client = mft_client.MFTClient(transfer_api_port =
configcli.transfer_api_port,
+ transfer_api_secured =
configcli.transfer_api_secured,
+ resource_service_host =
configcli.resource_service_host,
+ resource_service_port =
configcli.resource_service_port,
+ resource_service_secured =
configcli.resource_service_secured,
+ secret_service_host =
configcli.secret_service_host,
+ secret_service_port =
configcli.secret_service_port)
+
+ transfer_resp = client.transfer_api.submitTransfer(transfer_request)
+
+ transfer_id = transfer_resp.transferId
+
+ state_request =
MFTTransferApi_pb2.TransferStateApiRequest(transferId=transfer_id)
+
+ ## TODO: This has to be optimized and avoid frequent polling of all transfer
ids in each iteration
+ ## Possible fix is to introduce a parent batch transfer id at the API level
and fetch child trnasfer id
+ # summaries in a single API call
+
+ completed = 0
+ failed = 0
+
+ prev_percentage = 0
+ with typer.progressbar(length=100) as progress:
+
+ while 1:
+ state_resp = client.transfer_api.getTransferStateSummary(state_request)
+
+ progress_percentage = int(state_resp.percentage * 100)
+ progress.update(progress_percentage - prev_percentage)
+ prev_percentage = progress_percentage
+
if (state_resp.percentage == 1.0):
completed = len(state_resp.completed)
failed = len(state_resp.failed)
diff --git a/python-cli/mft_cli/airavata_mft_cli/storage/http.py
b/python-cli/mft_cli/airavata_mft_cli/storage/http.py
index d53976e..cf6f864 100644
--- a/python-cli/mft_cli/airavata_mft_cli/storage/http.py
+++ b/python-cli/mft_cli/airavata_mft_cli/storage/http.py
@@ -42,15 +42,15 @@ def handle_add_storage():
secret_service_host =
configcli.secret_service_host,
secret_service_port =
configcli.secret_service_port)
- base_url = typer.prompt("Base URL", "sawad")
+ base_url = typer.prompt("Base URL")
storage_name = typer.prompt("Storage Name", base_url)
options = ["Basic Auth", "Token" ]
option, index = pick(options, "What is the authentication method",
indicator="=>")
if index == 0:
- user_name = typer.prompt("User Name", "dds")
- password = typer.prompt("Password", "sasd")
+ user_name = typer.prompt("User Name")
+ password = typer.prompt("Password")
basic_auth = HttpCredential_pb2.BasicAuth(userName=user_name,
password=password)
http_secret = HttpCredential_pb2.HTTPSecret(basic=basic_auth)
elif index == 1:
@@ -64,8 +64,6 @@ def handle_add_storage():
http_storage_create_req = HTTPStorage_pb2.HTTPStorageCreateRequest(
baseUrl=base_url, name=storage_name)
- print("CP1")
- print(http_storage_create_req)
created_storage =
client.http_storage_api.createHTTPStorage(http_storage_create_req)
secret_for_storage_req = StorageCommon_pb2.SecretForStorage(storageId =
created_storage.storageId,
diff --git a/python-cli/mft_cli/pyproject.toml
b/python-cli/mft_cli/pyproject.toml
index 5a33477..5d54cc2 100644
--- a/python-cli/mft_cli/pyproject.toml
+++ b/python-cli/mft_cli/pyproject.toml
@@ -18,7 +18,7 @@
[tool.poetry]
name = "airavata-mft-cli"
-version = "0.1.11"
+version = "0.1.13"
description = "Command Line Client for Apache Airavata MFT data transfer
software"
authors = [
"Dimuthu Wannipurage <[email protected]>",
@@ -41,6 +41,7 @@ pick = {version= "2.2.0"}
grpcio= [{version="1.46.3", markers = "platform_machine !=
'arm64'"},{version="1.47.0rc1", markers = "platform_machine == 'arm64'"}]
grpcio-tools = [{version="1.46.3", markers = "platform_machine !=
'arm64'"},{version="1.47.0rc1", markers = "platform_machine == 'arm64'"}]
airavata-mft-sdk = "0.0.1a33"
+pandas = "^2.0.3"
[build-system]
requires = ["poetry-core"]