This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new 2fb40c0 Modifiy workflow manager to pass required parameters from
dataorcehstrator
new e5be829 Merge pull request #28 from isururanawaka/workflow_invocation
2fb40c0 is described below
commit 2fb40c02a62b282fa67e1298e04fbb7d18f9c4d2
Author: Isuru Ranawaka <[email protected]>
AuthorDate: Mon Jul 12 23:19:59 2021 -0400
Modifiy workflow manager to pass required parameters from dataorcehstrator
---
.../orchestrator/connectors/DRMSConnector.java | 63 +++++++++++-----------
.../connectors/WorkflowServiceConnector.java | 29 ++++------
.../processor/OutboundEventProcessor.java | 46 +++++++++++++---
.../datasync-workflow-manager/pom.xml | 6 +++
.../wm/datasync/DataSyncWorkflowManager.java | 40 +++++++++-----
.../wm/datasync/WorkflowEngineAPIHandler.java | 2 +-
.../src/main/resources/application.properties | 8 ++-
.../src/main/resources/application.properties | 12 ++---
.../engine/task/impl/AsyncDataTransferTask.java | 14 ++++-
.../src/main/proto/service/WorkflowService.proto | 7 ++-
10 files changed, 147 insertions(+), 80 deletions(-)
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index 6e8f584..a0b1292 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -29,6 +29,7 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
private ManagedChannel drmsChannel;
private ResourceServiceGrpc.ResourceServiceBlockingStub
resourceServiceBlockingStub;
private StorageServiceGrpc.StorageServiceBlockingStub
storageServiceBlockingStub;
+ private StoragePreferenceServiceGrpc.StoragePreferenceServiceBlockingStub
storagePreferenceServiceBlockingStub;
public DRMSConnector(Configuration configuration) throws Exception {
this.init(configuration);
@@ -41,6 +42,7 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
configuration.getOutboundEventProcessor().getDrmsPort()).usePlaintext().build();
this.resourceServiceBlockingStub =
ResourceServiceGrpc.newBlockingStub(drmsChannel);
this.storageServiceBlockingStub =
StorageServiceGrpc.newBlockingStub(drmsChannel);
+ this.storagePreferenceServiceBlockingStub =
StoragePreferenceServiceGrpc.newBlockingStub(drmsChannel);
}
@@ -74,7 +76,7 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
if (transferMapping.getSourceStorage().getStorageCase()
.equals(AnyStorage.StorageCase.SSH_STORAGE)) {
if
(transferMapping.getSourceStorage().getSshStorage().getHostName().equals(hostname))
{
- transferMappingOp.set(transferMapping);
+ transferMappingOp.set(transferMapping);
}
}
});
@@ -82,35 +84,6 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
return Optional.ofNullable(transferMappingOp.get());
}
- public Optional<String> getDestinationStorageId(DataOrchestratorEntity
entity, String hostname) {
- DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
-
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
- .setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(entity.getOwnerId())
- .setTenantId(entity.getTenantId())
- .build())
- .build();
- FindTransferMappingsRequest request =
FindTransferMappingsRequest.newBuilder()
- .setAuthToken(serviceAuthToken)
- .build();
- FindTransferMappingsResponse response =
storageServiceBlockingStub.getTransferMappings(request);
- List<TransferMapping> transferMappingList = response.getMappingsList();
- AtomicReference<String> storagePreferenceId = new
AtomicReference<>(null);
- if (!transferMappingList.isEmpty()) {
- transferMappingList.forEach(transferMapping -> {
- if (transferMapping.getDestinationStorage().getStorageCase()
-
.equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
- if
(transferMapping.getDestinationStorage().getSshStorage().getHostName().equals(hostname))
{
- storagePreferenceId
-
.set(transferMapping.getDestinationStorage().getSshStorage().getStorageId());
- }
- }
- });
- }
- return Optional.ofNullable(storagePreferenceId.get());
- }
-
public Optional<GenericResource>
createResource(DataOrchestratorEventRepository repository,
DataOrchestratorEntity entity,
String resourceId,
@@ -151,4 +124,34 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
return Optional.empty();
}
}
+
+ public Optional<AnyStoragePreference> getStoragePreference(String
authToken, String username, String tenantId, String storageId) {
+ DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(authToken)
+
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(username)
+ .setTenantId(tenantId)
+ .build())
+ .build();
+ StoragePreferenceSearchQuery searchQuery = StoragePreferenceSearchQuery
+ .newBuilder()
+ .setField("storageId")
+ .setValue(storageId)
+ .build();
+
+ StoragePreferenceSearchRequest storagePreferenceSearchRequest =
StoragePreferenceSearchRequest
+ .newBuilder()
+ .setAuthToken(serviceAuthToken)
+ .addQueries(searchQuery)
+ .build();
+ StoragePreferenceSearchResponse response =
storagePreferenceServiceBlockingStub
+ .searchStoragePreference(storagePreferenceSearchRequest);
+ List<AnyStoragePreference> preferences =
response.getStoragesPreferenceList();
+ if (!preferences.isEmpty()) {
+ return Optional.ofNullable(preferences.get(0));
+ }
+ return Optional.empty();
+ }
+
}
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
index d3cbc5e..1844054 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
@@ -2,13 +2,8 @@ package org.apache.airavata.datalake.orchestrator.connectors;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-import org.apache.airavata.datalake.drms.resource.GenericResource;
import org.apache.airavata.datalake.orchestrator.Configuration;
import
org.apache.airavata.datalake.orchestrator.core.connector.AbstractConnector;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEntity;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.DataOrchestratorEventRepository;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.EventStatus;
-import
org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken;
import
org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
import
org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowMessage;
import
org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowServiceGrpc;
@@ -48,25 +43,23 @@ public class WorkflowServiceConnector implements
AbstractConnector<Configuration
return false;
}
- public void invokeWorkflow(DataOrchestratorEventRepository repository,
DataOrchestratorEntity entity, GenericResource resource) {
+ public void invokeWorkflow(String username, String tenantId, String
sourceResourceId, String sourceCredentialToken,
+ String dstResourceId, String
destinationCredentialToken) {
try {
- WorkflowServiceAuthToken workflowServiceAuthToken =
WorkflowServiceAuthToken
- .newBuilder()
- .setAccessToken("")
- .build();
WorkflowMessage workflowMessage = WorkflowMessage.newBuilder()
- .setResourceId(resource.getResourceId())
+ .setSourceResourceId(sourceResourceId)
+ .setDestinationResourceId(dstResourceId)
+ .setUsername(username)
+ .setTenantId(tenantId)
+ .setSourceCredentialToken(sourceCredentialToken)
+ .setDestinationCredentialToken(destinationCredentialToken)
.build();
-
WorkflowInvocationRequest workflowInvocationRequest =
WorkflowInvocationRequest
-
.newBuilder().setMessage(workflowMessage).setAuthToken(workflowServiceAuthToken).build();
+ .newBuilder().setMessage(workflowMessage).build();
this.workflowServiceStub.invokeWorkflow(workflowInvocationRequest);
} catch (Exception ex) {
- LOGGER.error("Error occurred while invoking workflow engine",
entity.getResourceId(), ex);
- entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Error occurred while invoking workflow engine" +
ex.getMessage());
- repository.save(entity);
- return;
+ String msg = "Error occurred while invoking workflow engine " +
ex.getMessage();
+ throw new RuntimeException(msg, ex);
}
}
}
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index 13d4961..8ad401b 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -1,6 +1,7 @@
package org.apache.airavata.datalake.orchestrator.processor;
import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
import org.apache.airavata.datalake.drms.storage.TransferMapping;
import org.apache.airavata.datalake.orchestrator.Configuration;
import org.apache.airavata.datalake.orchestrator.Utils;
@@ -126,8 +127,8 @@ public class OutboundEventProcessor implements
MessageProcessor<Configuration> {
parentId, "FILE");
String dstResourceHost =
transferMapping.getDestinationStorage().getSshStorage().getHostName();
- String destinationResourceId = dstResourceHost+":"+
entity.getResourcePath() + ":" + entity.getResourceType();
- String messageId = Utils.getId(destinationResourceId);
+ String destinationResourceId = dstResourceHost + ":" +
entity.getResourcePath() + ":" + entity.getResourceType();
+ String messageId = Utils.getId(destinationResourceId);
Optional<GenericResource> destinationFile =
this.drmsConnector.createResource(repository, entity, messageId,
entity.getResourceName(),
@@ -136,11 +137,42 @@ public class OutboundEventProcessor implements
MessageProcessor<Configuration> {
"FILE");
if (optionalGenericResource.isPresent() &&
destinationFile.isPresent()) {
- this.workflowServiceConnector.invokeWorkflow(repository,
entity, optionalGenericResource.get());
-
entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
- repository.save(entity);
- } else {
-
+ try {
+
+ Optional<AnyStoragePreference> storagePreferenceOptional =
this.drmsConnector
+ .getStoragePreference(entity.getAuthToken(),
entity.getOwnerId(), entity.getTenantId(), sourceStorageId);
+
+ Optional<AnyStoragePreference>
destinationPreferenceOptional = this.drmsConnector
+ .getStoragePreference(entity.getAuthToken(),
entity.getOwnerId(), entity.getTenantId(), destinationStorageId);
+ if (storagePreferenceOptional.isPresent() &&
destinationPreferenceOptional.isPresent()) {
+ String sourceCredentialToken =
storagePreferenceOptional.get()
+ .getSshStoragePreference()
+ .getCredentialToken();
+ String destinationCredentialToken =
storagePreferenceOptional.get()
+ .getSshStoragePreference()
+ .getCredentialToken();
+
+
this.workflowServiceConnector.invokeWorkflow(entity.getOwnerId(),
+ entity.getTenantId(), entity.getResourceId(),
sourceCredentialToken,
+ messageId, destinationCredentialToken);
+
entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
+ repository.save(entity);
+ } else {
+ String msg = "Cannot find storage preference for
storage " + sourceStorageId + " for user " + entity.getOwnerId();
+ entity.setError(msg);
+ entity.setEventStatus(EventStatus.ERRORED.name());
+ repository.save(entity);
+ LOGGER.error(msg);
+ }
+
+
+ } catch (Exception exception) {
+ String msg = "Error occurred while invoking workflow
manager" + exception.getMessage();
+ entity.setError("Error occurred while invoking workflow
manager " + exception.getMessage());
+ entity.setEventStatus(EventStatus.ERRORED.name());
+ repository.save(entity);
+ LOGGER.error(msg, exception);
+ }
}
} catch (Exception exception) {
LOGGER.error("Error occurred while processing outbound data
orchestrator event", exception);
diff --git
a/data-orchestrator/workflow-engine/datasync-workflow-manager/pom.xml
b/data-orchestrator/workflow-engine/datasync-workflow-manager/pom.xml
index 83e45c5..84073c9 100644
--- a/data-orchestrator/workflow-engine/datasync-workflow-manager/pom.xml
+++ b/data-orchestrator/workflow-engine/datasync-workflow-manager/pom.xml
@@ -48,6 +48,12 @@
<artifactId>spring-boot-starter</artifactId>
<version>${spring.boot.version}</version>
</dependency>
+ <dependency>
+ <groupId>net.sf.dozer</groupId>
+ <artifactId>dozer</artifactId>
+ <version>5.5.1</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
index 014709f..da2d886 100644
---
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
+++
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
@@ -19,6 +19,7 @@ package
org.apache.airavata.datalake.workflow.engine.wm.datasync;
import org.apache.airavata.datalake.mft.listener.DataTransferEvent;
import org.apache.airavata.datalake.mft.listener.DataTransferEventDeserializer;
+import
org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowInvocationRequest;
import
org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask;
import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskUtil;
import
org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.AsyncDataTransferTask;
@@ -31,6 +32,7 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import java.time.Duration;
import java.util.*;
@@ -64,6 +66,19 @@ public class DataSyncWorkflowManager {
@org.springframework.beans.factory.annotation.Value("${datasync.wm.grpc.port}")
private int datasyncWmPort;
+
+ @Value("${mft.callback.url}")
+ private String callbackURL;
+ @Value("${mft.host}")
+ private String mftHost;
+ @Value("${mft.port}")
+ private int mftPort;
+ @Value("${mft.clientId}")
+ private String mftClientId;
+ @Value("${mft.clientSecret}")
+ private String mftClientSecret;
+
+
@Autowired
private CallbackWorkflowStore callbackWorkflowStore;
@@ -161,21 +176,22 @@ public class DataSyncWorkflowManager {
logger.info("Successfully initialized DatasyncWorkflow Manager");
}
- public void submitDataSyncWorkflow() throws Exception {
+ public void submitDataSyncWorkflow(WorkflowInvocationRequest
workflowInvocationRequest) throws Exception {
AsyncDataTransferTask dt1 = new AsyncDataTransferTask();
- dt1.setSourceResourceId("");
- dt1.setDestinationResourceId("");
- dt1.setSourceCredToken("");
- dt1.setDestinationCredToken("");
- dt1.setCallbackUrl("localhost:33335");
- dt1.setMftHost("localhost");
- dt1.setMftPort(7004);
- dt1.setMftClientId("");
- dt1.setMftClientSecret("");
- dt1.setUserId("dimuthu");
+
dt1.setSourceResourceId(workflowInvocationRequest.getMessage().getSourceResourceId());
+
dt1.setDestinationResourceId(workflowInvocationRequest.getMessage().getDestinationResourceId());
+
dt1.setSourceCredToken(workflowInvocationRequest.getMessage().getSourceCredentialToken());
+
dt1.setDestinationCredToken(workflowInvocationRequest.getMessage().getDestinationCredentialToken());
+ dt1.setTenantId(workflowInvocationRequest.getMessage().getTenantId());
+ dt1.setCallbackUrl(callbackURL);
+ dt1.setMftHost(mftHost);
+ dt1.setMftPort(mftPort);
+ dt1.setMftClientId(mftClientId);
+ dt1.setMftClientSecret(mftClientSecret);
+ dt1.setUserId(workflowInvocationRequest.getMessage().getUsername());
dt1.setCurrentSection(1);
dt1.setTaskId("dt-" + UUID.randomUUID().toString());
- dt1.setMftCallbackStoreHost(datasyncWmHost);
+ v dt1.setMftCallbackStoreHost(datasyncWmHost);
dt1.setMftCallbackStorePort(datasyncWmPort);
Map<String, AbstractTask> taskMap = new HashMap<>();
diff --git
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
index 3b88944..b772833 100644
---
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
+++
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
@@ -44,7 +44,7 @@ public class WorkflowEngineAPIHandler extends
WorkflowServiceGrpc.WorkflowServic
public void invokeWorkflow(WorkflowInvocationRequest request,
StreamObserver<WorkflowInvocationResponse>
responseObserver) {
try {
- dataSyncWorkflowManager.submitDataSyncWorkflow();
+ dataSyncWorkflowManager.submitDataSyncWorkflow(request);
responseObserver.onNext(WorkflowInvocationResponse.newBuilder().setStatus(true).build());
responseObserver.onCompleted();
} catch (Exception ex) {
diff --git
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
index 9a1b4f9..7df994f 100644
---
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
+++
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
@@ -29,4 +29,10 @@ kafka.mft.status.consumer.group=mft-even-group
datasync.wm.grpc.host=localhost
datasync.wm.grpc.port=6565
-spring.config.use-legacy-processing=true
\ No newline at end of file
+spring.config.use-legacy-processing=true
+
+mft.callback.url=localhost:33335
+mft.host=localhost
+mft.port=7004
+mft.clientId=wsedcfvrtgrtgrtg
+mft.clientSecret=rtgrtgrtgrtgt
\ No newline at end of file
diff --git
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
b/data-orchestrator/workflow-engine/workflow-engine-controller/src/main/resources/application.properties
similarity index 74%
copy from
data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
copy to
data-orchestrator/workflow-engine/workflow-engine-controller/src/main/resources/application.properties
index 9a1b4f9..9caa8a2 100644
---
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/resources/application.properties
+++
b/data-orchestrator/workflow-engine/workflow-engine-controller/src/main/resources/application.properties
@@ -20,13 +20,7 @@
cluster.name=datalake
zookeeper.connection=localhost:2181
-datasync.wm.name=datasync_wf
+controller.name=helix_controller
+task.list.file=task-list.yaml
-kafka.url=localhost:9092
-kafka.mft.publisher.name=mft-status-publisher
-kafka.mft.status.publish.topic=mft-status-topic
-kafka.mft.status.consumer.group=mft-even-group
-
-datasync.wm.grpc.host=localhost
-datasync.wm.grpc.port=6565
-spring.config.use-legacy-processing=true
\ No newline at end of file
+server.port=33335
\ No newline at end of file
diff --git
a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
index a3c400e..4d9c155 100644
---
a/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
+++
b/data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/AsyncDataTransferTask.java
@@ -60,6 +60,9 @@ public class AsyncDataTransferTask extends
BiSectionNonBlockingTask {
@TaskParam(name = "MFTCallbackStorePort")
private final ThreadLocal<Integer> mftCallbackStorePort = new
ThreadLocal<>();
+ @TaskParam(name = "TenantId")
+ private final ThreadLocal<String> tenantId = new ThreadLocal<>();
+
public TaskResult beforeSection() {
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient =
MFTApiClient.buildClient(getMftHost(), getMftPort());
@@ -176,7 +179,7 @@ public class AsyncDataTransferTask extends
BiSectionNonBlockingTask {
}
public void setMftClientSecret(String mftClientSecret) {
- this.mftClientSecret.set( mftClientSecret);
+ this.mftClientSecret.set(mftClientSecret);
}
public String getMftCallbackStoreHost() {
@@ -194,4 +197,13 @@ public class AsyncDataTransferTask extends
BiSectionNonBlockingTask {
public void setMftCallbackStorePort(Integer mftCallbackStorePort) {
this.mftCallbackStorePort.set(mftCallbackStorePort);
}
+
+ public String getTenantId() {
+ return tenantId.get();
+ }
+
+ public void setTenantId(String tenantId) {
+ this.tenantId.set(tenantId);
+ }
+
}
diff --git
a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
index a3557b1..c814b19 100644
---
a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
+++
b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
@@ -27,10 +27,15 @@ import "google/protobuf/empty.proto";
message WorkflowMessage {
string message_id = 1;
string resource_id = 2;
+ string source_resource_id = 3;
+ string destination_resource_id = 4;
+ string username = 5;
+ string tenantId = 6;
+ string source_credential_token = 7;
+ string destination_credential_token = 8;
}
message WorkflowInvocationRequest {
-
org.apache.airavata.datalake.orchestrator.workflow.WorkflowServiceAuthToken
auth_token = 1;
WorkflowMessage message = 2;
}