This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fb40c0  Modifiy workflow manager to pass required parameters from 
dataorcehstrator
     new e5be829  Merge pull request #28 from isururanawaka/workflow_invocation
2fb40c0 is described below

commit 2fb40c02a62b282fa67e1298e04fbb7d18f9c4d2
Author: Isuru Ranawaka <[email protected]>
AuthorDate: Mon Jul 12 23:19:59 2021 -0400

    Modifiy workflow manager to pass required parameters from dataorcehstrator
---
 .../orchestrator/connectors/DRMSConnector.java     | 63 +++++++++++-----------
 .../connectors/WorkflowServiceConnector.java       | 29 ++++------
 .../processor/OutboundEventProcessor.java          | 46 +++++++++++++---
 .../datasync-workflow-manager/pom.xml              |  6 +++
 .../wm/datasync/DataSyncWorkflowManager.java       | 40 +++++++++-----
 .../wm/datasync/WorkflowEngineAPIHandler.java      |  2 +-
 .../src/main/resources/application.properties      |  8 ++-
 .../src/main/resources/application.properties      | 12 ++---
 .../engine/task/impl/AsyncDataTransferTask.java    | 14 ++++-
 .../src/main/proto/service/WorkflowService.proto   |  7 ++-
 10 files changed, 147 insertions(+), 80 deletions(-)

diff --git 
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
 
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index 6e8f584..a0b1292 100644
--- 
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++ 
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -29,6 +29,7 @@ public class DRMSConnector implements 
AbstractConnector<Configuration> {
     private ManagedChannel drmsChannel;
     private ResourceServiceGrpc.ResourceServiceBlockingStub 
resourceServiceBlockingStub;
     private StorageServiceGrpc.StorageServiceBlockingStub 
storageServiceBlockingStub;
+    private StoragePreferenceServiceGrpc.StoragePreferenceServiceBlockingStub 
storagePreferenceServiceBlockingStub;
 
     public DRMSConnector(Configuration configuration) throws Exception {
         this.init(configuration);
@@ -41,6 +42,7 @@ public class DRMSConnector implements 
AbstractConnector<Configuration> {
                         
configuration.getOutboundEventProcessor().getDrmsPort()).usePlaintext().build();
         this.resourceServiceBlockingStub = 
ResourceServiceGrpc.newBlockingStub(drmsChannel);
         this.storageServiceBlockingStub = 
StorageServiceGrpc.newBlockingStub(drmsChannel);
+        this.storagePreferenceServiceBlockingStub = 
StoragePreferenceServiceGrpc.newBlockingStub(drmsChannel);
 
     }
 
@@ -74,7 +76,7 @@ public class DRMSConnector implements 
AbstractConnector<Configuration> {
                 if (transferMapping.getSourceStorage().getStorageCase()
                         .equals(AnyStorage.StorageCase.SSH_STORAGE)) {
                     if 
(transferMapping.getSourceStorage().getSshStorage().getHostName().equals(hostname))
 {
-                            transferMappingOp.set(transferMapping);
+                        transferMappingOp.set(transferMapping);
                     }
                 }
             });
@@ -82,35 +84,6 @@ public class DRMSConnector implements 
AbstractConnector<Configuration> {
         return Optional.ofNullable(transferMappingOp.get());
     }
 
-    public Optional<String> getDestinationStorageId(DataOrchestratorEntity 
entity, String hostname) {
-        DRMSServiceAuthToken serviceAuthToken = 
DRMSServiceAuthToken.newBuilder()
-                .setAccessToken(entity.getAuthToken())
-                
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
-                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-                        .setUsername(entity.getOwnerId())
-                        .setTenantId(entity.getTenantId())
-                        .build())
-                .build();
-        FindTransferMappingsRequest request = 
FindTransferMappingsRequest.newBuilder()
-                .setAuthToken(serviceAuthToken)
-                .build();
-        FindTransferMappingsResponse response = 
storageServiceBlockingStub.getTransferMappings(request);
-        List<TransferMapping> transferMappingList = response.getMappingsList();
-        AtomicReference<String> storagePreferenceId = new 
AtomicReference<>(null);
-        if (!transferMappingList.isEmpty()) {
-            transferMappingList.forEach(transferMapping -> {
-                if (transferMapping.getDestinationStorage().getStorageCase()
-                        
.equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
-                    if 
(transferMapping.getDestinationStorage().getSshStorage().getHostName().equals(hostname))
 {
-                        storagePreferenceId
-                                
.set(transferMapping.getDestinationStorage().getSshStorage().getStorageId());
-                    }
-                }
-            });
-        }
-        return Optional.ofNullable(storagePreferenceId.get());
-    }
-
 
     public Optional<GenericResource> 
createResource(DataOrchestratorEventRepository repository, 
DataOrchestratorEntity entity,
                                                     String resourceId,
@@ -151,4 +124,34 @@ public class DRMSConnector implements 
AbstractConnector<Configuration> {
             return Optional.empty();
         }
     }
+
+    public Optional<AnyStoragePreference> getStoragePreference(String 
authToken, String username, String tenantId, String storageId) {
+        DRMSServiceAuthToken serviceAuthToken = 
DRMSServiceAuthToken.newBuilder()
+                .setAccessToken(authToken)
+                
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                        .setUsername(username)
+                        .setTenantId(tenantId)
+                        .build())
+                .build();
+        StoragePreferenceSearchQuery searchQuery = StoragePreferenceSearchQuery
+                .newBuilder()
+                .setField("storageId")
+                .setValue(storageId)
+                .build();
+
+        StoragePreferenceSearchRequest storagePreferenceSearchRequest = 
StoragePreferenceSearchRequest
+                .newBuilder()
+                .setAuthToken(serviceAuthToken)
+                .addQueries(searchQuery)
+                .build();
+        StoragePreferenceSearchResponse response = 
storagePreferenceServiceBlockingStub
+                .searchStoragePreference(storagePreferenceSearchRequest);
+        List<AnyStoragePreference> preferences = 
response.getStoragesPreferenceList();
+        if (!preferences.isEmpty()) {
+            return Optional.ofNullable(preferences.get(0));
+        }
+        return Optional.empty();
+    }
+
 }
diff --git 
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
 
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
index d3cbc5e..1844054 100644
--- 
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
+++ 
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
@@ -2,13 +2,8 @@ package org.apache.airavata.datalake.orchestrator.connectors;
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-import org.apache.airavata.datalake.drms.resource.GenericResource;
 import org.apache.airavata.datalake.orchestrator.Configuration;
 import 
org.apache.airavata.datalake.orchestrator.core.connector.AbstractConnector;
-import 
org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
-import 
org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
-import 
org.apache.airavata.datalake.orchestrator.registry.persistance.EventStatus;
-import 
org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken;
 import 
org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
 import 
org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowMessage;
 import 
org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
@@ -48,25 +43,23 @@ public class WorkflowServiceConnector implements 
AbstractConnector<Configuration
         return false;
     }
 
-    public void invokeWorkflow(DataOrchestratorEventRepository repository, 
DataOrchestratorEntity entity, GenericResource resource) {
+    public void invokeWorkflow(String username, String tenantId, String 
sourceResourceId, String sourceCredentialToken,
+                               String dstResourceId, String 
destinationCredentialToken) {
         try {
-            WorkflowServiceAuthToken workflowServiceAuthToken = 
WorkflowServiceAuthToken
-                    .newBuilder()
-                    .setAccessToken("")
-                    .build();
             WorkflowMessage workflowMessage = WorkflowMessage.newBuilder()
-                    .setResourceId(resource.getResourceId())
+                    .setSourceResourceId(sourceResourceId)
+                    .setDestinationResourceId(dstResourceId)
+                    .setUsername(username)
+                    .setTenantId(tenantId)
+                    .setSourceCredentialToken(sourceCredentialToken)
+                    .setDestinationCredentialToken(destinationCredentialToken)
                     .build();
-
             WorkflowInvocationRequest workflowInvocationRequest = 
WorkflowInvocationRequest
-                    
.newBuilder().setMessage(workflowMessage).setAuthToken(workflowServiceAuthToken).build();
+                    .newBuilder().setMessage(workflowMessage).build();
             this.workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
         } catch (Exception ex) {
-            LOGGER.error("Error occurred while invoking workflow engine", 
entity.getResourceId(), ex);
-            entity.setEventStatus(EventStatus.ERRORED.name());
-            entity.setError("Error occurred while invoking workflow engine" + 
ex.getMessage());
-            repository.save(entity);
-            return;
+            String msg = "Error occurred while invoking workflow engine " + 
ex.getMessage();
+            throw new RuntimeException(msg, ex);
         }
     }
 }
diff --git 
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
 
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index 13d4961..8ad401b 100644
--- 
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ 
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -1,6 +1,7 @@
 package org.apache.airavata.datalake.orchestrator.processor;
 
 import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
 import org.apache.airavata.datalake.drms.storage.TransferMapping;
 import org.apache.airavata.datalake.orchestrator.Configuration;
 import org.apache.airavata.datalake.orchestrator.Utils;
@@ -126,8 +127,8 @@ public class OutboundEventProcessor implements 
MessageProcessor<Configuration> {
                             parentId, "FILE");
 
             String dstResourceHost = 
transferMapping.getDestinationStorage().getSshStorage().getHostName();
-            String destinationResourceId = dstResourceHost+":"+ 
entity.getResourcePath() + ":" + entity.getResourceType();
-            String messageId  = Utils.getId(destinationResourceId);
+            String destinationResourceId = dstResourceHost + ":" + 
entity.getResourcePath() + ":" + entity.getResourceType();
+            String messageId = Utils.getId(destinationResourceId);
 
             Optional<GenericResource> destinationFile = 
this.drmsConnector.createResource(repository, entity, messageId,
                     entity.getResourceName(),
@@ -136,11 +137,42 @@ public class OutboundEventProcessor implements 
MessageProcessor<Configuration> {
                     "FILE");
 
             if (optionalGenericResource.isPresent() && 
destinationFile.isPresent()) {
-                this.workflowServiceConnector.invokeWorkflow(repository, 
entity, optionalGenericResource.get());
-                
entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
-                repository.save(entity);
-            } else {
-
+                try {
+
+                    Optional<AnyStoragePreference> storagePreferenceOptional = 
this.drmsConnector
+                            .getStoragePreference(entity.getAuthToken(), 
entity.getOwnerId(), entity.getTenantId(), sourceStorageId);
+
+                    Optional<AnyStoragePreference> 
destinationPreferenceOptional = this.drmsConnector
+                            .getStoragePreference(entity.getAuthToken(), 
entity.getOwnerId(), entity.getTenantId(), destinationStorageId);
+                    if (storagePreferenceOptional.isPresent() && 
destinationPreferenceOptional.isPresent()) {
+                        String sourceCredentialToken = 
storagePreferenceOptional.get()
+                                .getSshStoragePreference()
+                                .getCredentialToken();
+                        String destinationCredentialToken = 
storagePreferenceOptional.get()
+                                .getSshStoragePreference()
+                                .getCredentialToken();
+
+                        
this.workflowServiceConnector.invokeWorkflow(entity.getOwnerId(),
+                                entity.getTenantId(), entity.getResourceId(), 
sourceCredentialToken,
+                                messageId, destinationCredentialToken);
+                        
entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
+                        repository.save(entity);
+                    } else {
+                        String msg = "Cannot find storage preference for 
storage " + sourceStorageId + " for user " + entity.getOwnerId();
+                        entity.setError(msg);
+                        entity.setEventStatus(EventStatus.ERRORED.name());
+                        repository.save(entity);
+                        LOGGER.error(msg);
+                    }
+
+
+                } catch (Exception exception) {
+                    String msg = "Error occurred while invoking workflow 
manager" + exception.getMessage();
+                    entity.setError("Error occurred while invoking workflow 
manager " + exception.getMessage());
+                    entity.setEventStatus(EventStatus.ERRORED.name());
+                    repository.save(entity);
+                    LOGGER.error(msg, exception);
+                }
             }
         } catch (Exception exception) {
             LOGGER.error("Error occurred while processing outbound data 
orchestrator event", exception);
diff --git 
a/data-orchestrator/workflow-engine/datasync-workflow-manager/pom.xml 
b/data-orchestrator/workflow-engine/datasync-workflow-manager/pom.xml
index 83e45c5..84073c9 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/pom.xml
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/pom.xml
@@ -48,6 +48,12 @@
             <artifactId>spring-boot-starter</artifactId>
             <version>${spring.boot.version}</version>
         </dependency>
+        <dependency>
+            <groupId>net.sf.dozer</groupId>
+            <artifactId>dozer</artifactId>
+            <version>5.5.1</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
 
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
index 014709f..da2d886 100644
--- 
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
+++ 
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
@@ -19,6 +19,7 @@ package 
org.apache.airavata.datalake.workflow.engine.wm.datasync;
 
 import org.apache.airavata.datalake.mft.listener.DataTransferEvent;
 import org.apache.airavata.datalake.mft.listener.DataTransferEventDeserializer;
+import 
org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
 import 
org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
 import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskUtil;
 import 
org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.AsyncDataTransferTask;
@@ -31,6 +32,7 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 
 import java.time.Duration;
 import java.util.*;
@@ -64,6 +66,19 @@ public class DataSyncWorkflowManager {
     
@org.springframework.beans.factory.annotation.Value("${datasync.wm.grpc.port}")
     private int datasyncWmPort;
 
+
+    @Value("${mft.callback.url}")
+    private String callbackURL;
+    @Value("${mft.host}")
+    private String mftHost;
+    @Value("${mft.port}")
+    private int mftPort;
+    @Value("${mft.clientId}")
+    private String mftClientId;
+    @Value("${mft.clientSecret}")
+    private String mftClientSecret;
+
+
     @Autowired
     private CallbackWorkflowStore callbackWorkflowStore;
 
@@ -161,21 +176,22 @@ public class DataSyncWorkflowManager {
         logger.info("Successfully initialized DatasyncWorkflow Manager");
     }
 
-    public void submitDataSyncWorkflow() throws Exception {
+    public void submitDataSyncWorkflow(WorkflowInvocationRequest 
workflowInvocationRequest) throws Exception {
         AsyncDataTransferTask dt1 = new AsyncDataTransferTask();
-        dt1.setSourceResourceId("");
-        dt1.setDestinationResourceId("");
-        dt1.setSourceCredToken("");
-        dt1.setDestinationCredToken("");
-        dt1.setCallbackUrl("localhost:33335");
-        dt1.setMftHost("localhost");
-        dt1.setMftPort(7004);
-        dt1.setMftClientId("");
-        dt1.setMftClientSecret("");
-        dt1.setUserId("dimuthu");
+        
dt1.setSourceResourceId(workflowInvocationRequest.getMessage().getSourceResourceId());
+        
dt1.setDestinationResourceId(workflowInvocationRequest.getMessage().getDestinationResourceId());
+        
dt1.setSourceCredToken(workflowInvocationRequest.getMessage().getSourceCredentialToken());
+        
dt1.setDestinationCredToken(workflowInvocationRequest.getMessage().getDestinationCredentialToken());
+        dt1.setTenantId(workflowInvocationRequest.getMessage().getTenantId());
+        dt1.setCallbackUrl(callbackURL);
+        dt1.setMftHost(mftHost);
+        dt1.setMftPort(mftPort);
+        dt1.setMftClientId(mftClientId);
+        dt1.setMftClientSecret(mftClientSecret);
+        dt1.setUserId(workflowInvocationRequest.getMessage().getUsername());
         dt1.setCurrentSection(1);
         dt1.setTaskId("dt-" + UUID.randomUUID().toString());
-        dt1.setMftCallbackStoreHost(datasyncWmHost);
+   v     dt1.setMftCallbackStoreHost(datasyncWmHost);
         dt1.setMftCallbackStorePort(datasyncWmPort);
 
         Map<String, AbstractTask> taskMap = new HashMap<>();
diff --git 
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
 
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
index 3b88944..b772833 100644
--- 
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
+++ 
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
@@ -44,7 +44,7 @@ public class WorkflowEngineAPIHandler extends 
WorkflowServiceGrpc.WorkflowServic
     public void invokeWorkflow(WorkflowInvocationRequest request,
                                StreamObserver<WorkflowInvocationResponse> 
responseObserver) {
         try {
-            dataSyncWorkflowManager.submitDataSyncWorkflow();
+            dataSyncWorkflowManager.submitDataSyncWorkflow(request);
             
responseObserver.onNext(WorkflowInvocationResponse.newBuilder().setStatus(true).build());
             responseObserver.onCompleted();
         } catch (Exception ex) {
diff --git 
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
 
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
index 9a1b4f9..7df994f 100644
--- 
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
+++ 
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
@@ -29,4 +29,10 @@ kafka.mft.status.consumer.group=mft-even-group
 
 datasync.wm.grpc.host=localhost
 datasync.wm.grpc.port=6565
-spring.config.use-legacy-processing=true
\ No newline at end of file
+spring.config.use-legacy-processing=true
+
+mft.callback.url=localhost:33335
+mft.host=localhost
+mft.port=7004
+mft.clientId=wsedcfvrtgrtgrtg
+mft.clientSecret=rtgrtgrtgrtgt
\ No newline at end of file
diff --git 
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
 
b/data-orchestrator/workflow-engine/workflow-engine-controller/src/main/resources/application.properties
similarity index 74%
copy from 
data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
copy to 
data-orchestrator/workflow-engine/workflow-engine-controller/src/main/resources/application.properties
index 9a1b4f9..9caa8a2 100644
--- 
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
+++ 
b/data-orchestrator/workflow-engine/workflow-engine-controller/src/main/resources/application.properties
@@ -20,13 +20,7 @@
 cluster.name=datalake
 zookeeper.connection=localhost:2181
 
-datasync.wm.name=datasync_wf
+controller.name=helix_controller
+task.list.file=task-list.yaml
 
-kafka.url=localhost:9092
-kafka.mft.publisher.name=mft-status-publisher
-kafka.mft.status.publish.topic=mft-status-topic
-kafka.mft.status.consumer.group=mft-even-group
-
-datasync.wm.grpc.host=localhost
-datasync.wm.grpc.port=6565
-spring.config.use-legacy-processing=true
\ No newline at end of file
+server.port=33335
\ No newline at end of file
diff --git 
a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
 
b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
index a3c400e..4d9c155 100644
--- 
a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
+++ 
b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
@@ -60,6 +60,9 @@ public class AsyncDataTransferTask extends 
BiSectionNonBlockingTask {
     @TaskParam(name = "MFTCallbackStorePort")
     private final ThreadLocal<Integer> mftCallbackStorePort = new 
ThreadLocal<>();
 
+    @TaskParam(name = "TenantId")
+    private final ThreadLocal<String> tenantId = new ThreadLocal<>();
+
 
     public TaskResult beforeSection() {
         MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = 
MFTApiClient.buildClient(getMftHost(), getMftPort());
@@ -176,7 +179,7 @@ public class AsyncDataTransferTask extends 
BiSectionNonBlockingTask {
     }
 
     public void setMftClientSecret(String mftClientSecret) {
-        this.mftClientSecret.set( mftClientSecret);
+        this.mftClientSecret.set(mftClientSecret);
     }
 
     public String getMftCallbackStoreHost() {
@@ -194,4 +197,13 @@ public class AsyncDataTransferTask extends 
BiSectionNonBlockingTask {
     public void setMftCallbackStorePort(Integer mftCallbackStorePort) {
         this.mftCallbackStorePort.set(mftCallbackStorePort);
     }
+
+    public String getTenantId() {
+        return tenantId.get();
+    }
+
+    public void setTenantId(String tenantId) {
+        this.tenantId.set(tenantId);
+    }
+
 }
diff --git 
a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
 
b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
index a3557b1..c814b19 100644
--- 
a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
+++ 
b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
@@ -27,10 +27,15 @@ import "google/protobuf/empty.proto";
 message WorkflowMessage {
     string message_id = 1;
     string resource_id = 2;
+    string source_resource_id = 3;
+    string destination_resource_id = 4;
+    string username = 5;
+    string tenantId = 6;
+    string source_credential_token = 7;
+    string destination_credential_token = 8;
 }
 
 message WorkflowInvocationRequest {
-    
org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken 
auth_token = 1;
     WorkflowMessage message = 2;
 }
 

Reply via email to