This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new caa27c4 Bug fix drms
new 16b56bb Merge pull request #24 from isururanawaka/workflow_invocation
caa27c4 is described below
commit caa27c47f8ec8cba905f09926ac6f113fe0669e5
Author: Isuru Ranawaka <[email protected]>
AuthorDate: Sun Jul 11 12:17:49 2021 -0400
Bug fix drms
---
.../orchestrator/connectors/DRMSConnector.java | 121 ++++++++++-----------
.../processor/OutboundEventProcessor.java | 28 ++++-
.../drms-rest-proxy/src/main/resources/drms.pb | Bin 109344 -> 109314 bytes
.../src/main/proto/storage/StorageService.proto | 6 +-
4 files changed, 83 insertions(+), 72 deletions(-)
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index f87542e..4f9cbab 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -28,7 +28,7 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
private ManagedChannel drmsChannel;
private ResourceServiceGrpc.ResourceServiceBlockingStub
resourceServiceBlockingStub;
- private StoragePreferenceServiceGrpc.StoragePreferenceServiceBlockingStub
storagePreferenceServiceBlockingStub;
+ private StorageServiceGrpc.StorageServiceBlockingStub
storageServiceBlockingStub;
public DRMSConnector(Configuration configuration) throws Exception {
this.init(configuration);
@@ -40,7 +40,7 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
.forAddress(configuration.getOutboundEventProcessor().getDrmsHost(),
configuration.getOutboundEventProcessor().getDrmsPort()).usePlaintext().build();
this.resourceServiceBlockingStub =
ResourceServiceGrpc.newBlockingStub(drmsChannel);
- this.storagePreferenceServiceBlockingStub =
StoragePreferenceServiceGrpc.newBlockingStub(drmsChannel);
+ this.storageServiceBlockingStub =
StorageServiceGrpc.newBlockingStub(drmsChannel);
}
@@ -54,67 +54,62 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
return !this.drmsChannel.isShutdown();
}
-// public Optional<String>
getSourceStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
-// DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
-// .setAccessToken(entity.getAuthToken())
-//
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
-// .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-// .setUsername(entity.getOwnerId())
-// .setTenantId(entity.getTenantId())
-// .build())
-// .build();
-// FindTransferMappingsRequest request =
FindTransferMappingsRequest.newBuilder()
-// .setAuthToken(serviceAuthToken)
-// .build();
-// FindTransferMappingsResponse response =
storagePreferenceServiceBlockingStub.getTransferMappings(request);
-// List<TransferMapping> transferMappingList =
response.getMappingsList();
-// AtomicReference<String> storagePreferenceId = new
AtomicReference<>(null);
-// if (!transferMappingList.isEmpty()) {
-// transferMappingList.forEach(transferMapping -> {
-// if
(transferMapping.getSourceStoragePreference().getStorageCase()
-//
.equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
-// if
(transferMapping.getSourceStoragePreference().getSshStoragePreference()
-// .getStorage().getHostName().equals(hostname)) {
-// storagePreferenceId
-//
.set(transferMapping.getSourceStoragePreference()
-//
.getSshStoragePreference().getStoragePreferenceId());
-// }
-// }
-// });
-// }
-// return Optional.ofNullable(storagePreferenceId.get());
-// }
-//
-// public Optional<String>
getDestinationStoragePreferenceId(DataOrchestratorEntity entity, String
hostname) {
-// DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
-// .setAccessToken(entity.getAuthToken())
-//
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
-// .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-// .setUsername(entity.getOwnerId())
-// .setTenantId(entity.getTenantId())
-// .build())
-// .build();
-// FindTransferMappingsRequest request =
FindTransferMappingsRequest.newBuilder()
-// .setAuthToken(serviceAuthToken)
-// .build();
-// FindTransferMappingsResponse response =
storagePreferenceServiceBlockingStub.getTransferMappings(request);
-// List<TransferMapping> transferMappingList =
response.getMappingsList();
-// AtomicReference<String> storagePreferenceId = new
AtomicReference<>(null);
-// if (!transferMappingList.isEmpty()) {
-// transferMappingList.forEach(transferMapping -> {
-// if
(transferMapping.getDestinationStoragePreference().getStorageCase()
-//
.equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
-// if
(transferMapping.getDestinationStoragePreference().getSshStoragePreference()
-// .getStorage().getHostName().equals(hostname)) {
-// storagePreferenceId
-//
.set(transferMapping.getDestinationStoragePreference()
-//
.getSshStoragePreference().getStoragePreferenceId());
-// }
-// }
-// });
-// }
-// return Optional.ofNullable(storagePreferenceId.get());
-// }
+ public Optional<TransferMapping>
getActiveTransferMapping(DataOrchestratorEntity entity, String hostname) {
+ DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(entity.getAuthToken())
+
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(entity.getOwnerId())
+ .setTenantId(entity.getTenantId())
+ .build())
+ .build();
+ FindTransferMappingsRequest request =
FindTransferMappingsRequest.newBuilder()
+ .setAuthToken(serviceAuthToken)
+ .build();
+ FindTransferMappingsResponse response =
storageServiceBlockingStub.getTransferMappings(request);
+ List<TransferMapping> transferMappingList = response.getMappingsList();
+ AtomicReference<TransferMapping> transferMappingOp = new
AtomicReference<>(null);
+ if (!transferMappingList.isEmpty()) {
+ transferMappingList.forEach(transferMapping -> {
+ if (transferMapping.getSourceStorage().getStorageCase()
+
.equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+ if
(transferMapping.getSourceStorage().getSshStorage().getHostName().equals(hostname))
{
+ transferMappingOp.set(transferMapping);
+ }
+ }
+ });
+ }
+ return Optional.ofNullable(transferMappingOp.get());
+ }
+
+ public Optional<String> getDestinationStorageId(DataOrchestratorEntity
entity, String hostname) {
+ DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
+ .setAccessToken(entity.getAuthToken())
+
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+ .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+ .setUsername(entity.getOwnerId())
+ .setTenantId(entity.getTenantId())
+ .build())
+ .build();
+ FindTransferMappingsRequest request =
FindTransferMappingsRequest.newBuilder()
+ .setAuthToken(serviceAuthToken)
+ .build();
+ FindTransferMappingsResponse response =
storageServiceBlockingStub.getTransferMappings(request);
+ List<TransferMapping> transferMappingList = response.getMappingsList();
+ AtomicReference<String> storagePreferenceId = new
AtomicReference<>(null);
+ if (!transferMappingList.isEmpty()) {
+ transferMappingList.forEach(transferMapping -> {
+ if (transferMapping.getDestinationStorage().getStorageCase()
+
.equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+ if
(transferMapping.getDestinationStorage().getSshStorage().getHostName().equals(hostname))
{
+ storagePreferenceId
+
.set(transferMapping.getDestinationStorage().getSshStorage().getStorageId());
+ }
+ }
+ });
+ }
+ return Optional.ofNullable(storagePreferenceId.get());
+ }
public Optional<GenericResource>
createResource(DataOrchestratorEventRepository repository,
DataOrchestratorEntity entity,
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index 013ee59..522d8a7 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -1,6 +1,7 @@
package org.apache.airavata.datalake.orchestrator.processor;
import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.TransferMapping;
import org.apache.airavata.datalake.orchestrator.Configuration;
import org.apache.airavata.datalake.orchestrator.Utils;
import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
@@ -89,27 +90,28 @@ public class OutboundEventProcessor implements
MessageProcessor<Configuration> {
String tail =
resourcePath.substring(resourcePath.indexOf(ownerId));
String[] collections = tail.split("/");
-// Optional<String> optionalStorPref =
drmsConnector.getSourceStoragePreferenceId(entity, entity.getHostName());
- Optional<String> optionalStorPref = null;
+ Optional<TransferMapping> optionalStorPref =
drmsConnector.getActiveTransferMapping(entity, entity.getHostName());
if (optionalStorPref.isEmpty()) {
entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("StoragePreference not found for host: " +
entity.getHostName());
+ entity.setError("Storage not found for host: " +
entity.getHostName());
repository.save(entity);
return;
}
- String parentId = optionalStorPref.get();
+ TransferMapping transferMapping = optionalStorPref.get();
+ String sourceStorageId =
transferMapping.getSourceStorage().getSshStorage().getStorageId();
+ String destinationStorageId =
transferMapping.getDestinationStorage().getSshStorage().getStorageId();
+ String parentId = sourceStorageId;
for (int i = 1; i < collections.length - 1; i++) {
String resourceName = collections[i];
String path = entity.getResourcePath().substring(0,
entity.getResourcePath().indexOf(resourceName));
path = path.concat(resourceName);
String entityId = Utils.getId(path);
Optional<GenericResource> optionalGenericResource =
- this.drmsConnector.createResource(repository, entity,
entityId, resourceName, path, parentId, "COLLECTION");
+ this.drmsConnector.createResource(repository, entity,
entityId, resourceName, path, sourceStorageId, "COLLECTION");
if (optionalGenericResource.isPresent()) {
parentId = optionalGenericResource.get().getResourceId();
-
} else {
entity.setEventStatus(EventStatus.ERRORED.name());
entity.setError("Collection structure creation failed: " +
entity.getHostName());
@@ -123,6 +125,20 @@ public class OutboundEventProcessor implements
MessageProcessor<Configuration> {
collections[collections.length - 1],
entity.getResourcePath(),
parentId, "FILE");
+ String dstResourceHost =
transferMapping.getDestinationStorage().getSshStorage().getHostName();
+ String destinationResourceId = dstResourceHost+":"+
entity.getResourcePath() + ":" + entity.getResourceType();
+ String messageId = Utils.getId(destinationResourceId);
+
+ Optional<GenericResource> destinationFile =
this.drmsConnector.createResource(repository, entity, messageId,
+ entity.getResourceName(),
+ entity.getResourcePath(),
+ destinationStorageId,
+ "FILE");
+
+ Optional<GenericResource> optionalGenericResourceDST =
+ this.drmsConnector.createResource(repository, entity,
messageId,
+ collections[collections.length - 1],
entity.getResourcePath(),
+ destinationStorageId, "FILE");
if (optionalGenericResource.isPresent()) {
this.workflowServiceConnector.invokeWorkflow(repository,
entity, optionalGenericResource.get());
diff --git
a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
index 0227833..1b745c1 100644
Binary files
a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
and
b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
differ
diff --git
a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
index 3f5b3a2..bd89026 100644
---
a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
+++
b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
@@ -167,19 +167,19 @@ service StorageService {
rpc createTransferMapping (CreateTransferMappingRequest) returns
(CreateTransferMappingResponse) {
option (google.api.http) = {
- post: "/v1.0/api/drms/storagePreference/transferMapping"
+ post: "/v1.0/api/drms/storage/transferMapping"
};
}
rpc getTransferMappings (FindTransferMappingsRequest) returns
(FindTransferMappingsResponse) {
option (google.api.http) = {
- get: "/v1.0/api/drms/storagePreference/transferMapping"
+ get: "/v1.0/api/drms/storage/transferMapping"
};
}
rpc deleteTransferMappings (DeleteTransferMappingRequest) returns
(google.protobuf.Empty) {
option (google.api.http) = {
- delete: "/v1.0/api/drms/storagePreference/transferMapping"
+ delete: "/v1.0/api/drms/storage/transferMapping"
};
}
}
\ No newline at end of file